From 0d15ff57e63690e42b3ae448e0d153ccc710d88c Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 18 Aug 2021 17:36:30 +0200 Subject: [PATCH] server: Implement schema migrations --- etcdutl/etcdutl/migrate_command.go | 27 +-- server/storage/schema/actions.go | 93 +++++++++ server/storage/schema/actions_test.go | 170 +++++++++++++++++ server/storage/schema/changes.go | 50 +++++ server/storage/schema/changes_test.go | 61 ++++++ server/storage/schema/migration.go | 119 ++++++++++++ server/storage/schema/migration_test.go | 179 ++++++++++++++++++ server/storage/schema/schema.go | 85 ++++++--- server/storage/schema/schema_test.go | 240 ++++++++++++++++++------ tests/e2e/utl_migrate_test.go | 10 +- 10 files changed, 940 insertions(+), 94 deletions(-) create mode 100644 server/storage/schema/actions.go create mode 100644 server/storage/schema/actions_test.go create mode 100644 server/storage/schema/changes.go create mode 100644 server/storage/schema/changes_test.go create mode 100644 server/storage/schema/migration.go create mode 100644 server/storage/schema/migration_test.go diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index d83ed82e9..9c429e716 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -103,29 +103,30 @@ func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() lg := GetLogger() tx := c.be.BatchTx() - tx.Lock() current, err := schema.DetectSchemaVersion(lg, tx) if err != nil { - tx.Unlock() lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err } - if *current == *c.targetVersion { - tx.Unlock() - lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(current))) + if current == *c.targetVersion { + lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } - if c.force { - unsafeMigrateForce(lg, tx, c.targetVersion) - tx.Unlock() - c.be.ForceCommit() - return nil + err = schema.Migrate(lg, tx, *c.targetVersion) + if err != nil { + if !c.force { + return err + } + lg.Info("normal migrate failed, trying with force", zap.Error(err)) + migrateForce(lg, tx, c.targetVersion) } - tx.Unlock() - return fmt.Errorf("storage version migration is not yet supported") + c.be.ForceCommit() + return nil } -func unsafeMigrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { +func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { + tx.Lock() + defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { schema.UnsafeClearStorageVersion(tx) diff --git a/server/storage/schema/actions.go b/server/storage/schema/actions.go new file mode 100644 index 000000000..103a2ef7a --- /dev/null +++ b/server/storage/schema/actions.go @@ -0,0 +1,93 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "go.etcd.io/etcd/server/v3/storage/backend" + "go.uber.org/zap" +) + +type action interface { + // unsafeDo executes the action and returns revert action, when executed + // should restore the state from before. + unsafeDo(tx backend.BatchTx) (revert action, err error) +} + +type setKeyAction struct { + Bucket backend.Bucket + FieldName []byte + FieldValue []byte +} + +func (a setKeyAction) unsafeDo(tx backend.BatchTx) (action, error) { + revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName) + tx.UnsafePut(a.Bucket, a.FieldName, a.FieldValue) + return revert, nil +} + +type deleteKeyAction struct { + Bucket backend.Bucket + FieldName []byte +} + +func (a deleteKeyAction) unsafeDo(tx backend.BatchTx) (action, error) { + revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName) + tx.UnsafeDelete(a.Bucket, a.FieldName) + return revert, nil +} + +func restoreFieldValueAction(tx backend.BatchTx, bucket backend.Bucket, fieldName []byte) action { + _, vs := tx.UnsafeRange(bucket, fieldName, nil, 1) + if len(vs) == 1 { + return &setKeyAction{ + Bucket: bucket, + FieldName: fieldName, + FieldValue: vs[0], + } + } + return &deleteKeyAction{ + Bucket: bucket, + FieldName: fieldName, + } +} + +type ActionList []action + +// unsafeExecute executes actions one by one. If one of actions returns error, +// it will revert them. +func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error { + var revertActions = make(ActionList, 0, len(as)) + for _, a := range as { + revert, err := a.unsafeDo(tx) + + if err != nil { + revertActions.unsafeExecuteInReverseOrder(lg, tx) + return err + } + revertActions = append(revertActions, revert) + } + return nil +} + +// unsafeExecuteInReverseOrder executes actions in revered order. Will panic on +// action error. Should be used when reverting. +func (as ActionList) unsafeExecuteInReverseOrder(lg *zap.Logger, tx backend.BatchTx) { + for j := len(as) - 1; j >= 0; j-- { + _, err := as[j].unsafeDo(tx) + if err != nil { + lg.Panic("Cannot recover from revert error", zap.Error(err)) + } + } +} diff --git a/server/storage/schema/actions_test.go b/server/storage/schema/actions_test.go new file mode 100644 index 000000000..ee43d69f4 --- /dev/null +++ b/server/storage/schema/actions_test.go @@ -0,0 +1,170 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/storage/backend" + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "go.uber.org/zap/zaptest" +) + +func TestActionIsReversible(t *testing.T) { + tcs := []struct { + name string + action action + state map[string]string + }{ + { + name: "setKeyAction empty state", + action: setKeyAction{ + Bucket: Meta, + FieldName: []byte("/test"), + FieldValue: []byte("1"), + }, + }, + { + name: "setKeyAction with key", + action: setKeyAction{ + Bucket: Meta, + FieldName: []byte("/test"), + FieldValue: []byte("1"), + }, + state: map[string]string{"/test": "2"}, + }, + { + name: "deleteKeyAction empty state", + action: deleteKeyAction{ + Bucket: Meta, + FieldName: []byte("/test"), + }, + }, + { + name: "deleteKeyAction with key", + action: deleteKeyAction{ + Bucket: Meta, + FieldName: []byte("/test"), + }, + state: map[string]string{"/test": "2"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + defer be.Close() + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + defer tx.Unlock() + UnsafeCreateMetaBucket(tx) + putKeyValues(tx, Meta, tc.state) + + assertBucketState(t, tx, Meta, tc.state) + reverse, err := tc.action.unsafeDo(tx) + if err != nil { + t.Errorf("Failed to upgrade, err: %v", err) + } + _, err = reverse.unsafeDo(tx) + if err != nil { + t.Errorf("Failed to downgrade, err: %v", err) + } + assertBucketState(t, tx, Meta, tc.state) + }) + } +} + +func TestActionListRevert(t *testing.T) { + tcs := []struct { + name string + + actions ActionList + expectState map[string]string + expectError error + }{ + { + name: "Apply multiple actions", + actions: ActionList{ + setKeyAction{Meta, []byte("/testKey1"), []byte("testValue1")}, + setKeyAction{Meta, []byte("/testKey2"), []byte("testValue2")}, + }, + expectState: map[string]string{"/testKey1": "testValue1", "/testKey2": "testValue2"}, + }, + { + name: "Broken action should result in changes reverted", + actions: ActionList{ + setKeyAction{Meta, []byte("/testKey1"), []byte("testValue1")}, + brokenAction{}, + setKeyAction{Meta, []byte("/testKey2"), []byte("testValue2")}, + }, + expectState: map[string]string{}, + expectError: errBrokenAction, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zaptest.NewLogger(t) + + be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + defer be.Close() + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + defer tx.Unlock() + + UnsafeCreateMetaBucket(tx) + err := tc.actions.unsafeExecute(lg, tx) + if err != tc.expectError { + t.Errorf("Unexpected error or lack thereof, expected: %v, got: %v", tc.expectError, err) + } + assertBucketState(t, tx, Meta, tc.expectState) + }) + } +} + +type brokenAction struct{} + +var errBrokenAction = fmt.Errorf("broken action error") + +func (c brokenAction) unsafeDo(tx backend.BatchTx) (action, error) { + return nil, errBrokenAction +} + +func putKeyValues(tx backend.BatchTx, bucket backend.Bucket, kvs map[string]string) { + for k, v := range kvs { + tx.UnsafePut(bucket, []byte(k), []byte(v)) + } +} + +func assertBucketState(t *testing.T, tx backend.BatchTx, bucket backend.Bucket, expect map[string]string) { + t.Helper() + got := map[string]string{} + ks, vs := tx.UnsafeRange(bucket, []byte("\x00"), []byte("\xff"), 0) + for i := 0; i < len(ks); i++ { + got[string(ks[i])] = string(vs[i]) + } + if expect == nil { + expect = map[string]string{} + } + assert.Equal(t, expect, got) +} diff --git a/server/storage/schema/changes.go b/server/storage/schema/changes.go new file mode 100644 index 000000000..6eb0b7512 --- /dev/null +++ b/server/storage/schema/changes.go @@ -0,0 +1,50 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import "go.etcd.io/etcd/server/v3/storage/backend" + +type schemaChange interface { + upgradeAction() action + downgradeAction() action +} + +// addNewField represents adding new field when upgrading. Downgrade will remove the field. +func addNewField(bucket backend.Bucket, fieldName []byte, fieldValue []byte) schemaChange { + return simpleSchemaChange{ + upgrade: setKeyAction{ + Bucket: bucket, + FieldName: fieldName, + FieldValue: fieldValue, + }, + downgrade: deleteKeyAction{ + Bucket: bucket, + FieldName: fieldName, + }, + } +} + +type simpleSchemaChange struct { + upgrade action + downgrade action +} + +func (c simpleSchemaChange) upgradeAction() action { + return c.upgrade +} + +func (c simpleSchemaChange) downgradeAction() action { + return c.downgrade +} diff --git a/server/storage/schema/changes_test.go b/server/storage/schema/changes_test.go new file mode 100644 index 000000000..05b8d49cf --- /dev/null +++ b/server/storage/schema/changes_test.go @@ -0,0 +1,61 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "testing" + "time" + + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" +) + +func TestUpgradeDowngrade(t *testing.T) { + tcs := []struct { + name string + change schemaChange + expectStateAfterUpgrade map[string]string + expectStateAfterDowngrade map[string]string + }{ + { + name: "addNewField empty", + change: addNewField(Meta, []byte("/test"), []byte("1")), + expectStateAfterUpgrade: map[string]string{"/test": "1"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + defer be.Close() + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + defer tx.Unlock() + UnsafeCreateMetaBucket(tx) + + _, err := tc.change.upgradeAction().unsafeDo(tx) + if err != nil { + t.Errorf("Failed to upgrade, err: %v", err) + } + assertBucketState(t, tx, Meta, tc.expectStateAfterUpgrade) + _, err = tc.change.downgradeAction().unsafeDo(tx) + if err != nil { + t.Errorf("Failed to downgrade, err: %v", err) + } + assertBucketState(t, tx, Meta, tc.expectStateAfterDowngrade) + }) + } +} diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go new file mode 100644 index 000000000..00bf60605 --- /dev/null +++ b/server/storage/schema/migration.go @@ -0,0 +1,119 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + + "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.uber.org/zap" +) + +type migrationPlan []migrationStep + +func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) { + if current.Major != target.Major { + lg.Error("Changing major storage version is not supported", + zap.String("storage-version", current.String()), + zap.String("target-storage-version", target.String()), + ) + return nil, fmt.Errorf("Changing major storage version is not supported") + } + // TODO(serathius): Implement downgrades + if current.Minor > target.Minor { + lg.Error("Target version is lower than the current version, downgrades are not yet supported", + zap.String("storage-version", current.String()), + zap.String("target-storage-version", target.String()), + ) + return nil, fmt.Errorf("downgrades are not yet supported") + } + return buildPlan(current, target) +} + +func buildPlan(current semver.Version, target semver.Version) (plan migrationPlan, err error) { + for current.Minor != target.Minor { + isUpgrade := current.Minor < target.Minor + + changes, err := schemaChangesForVersion(current, isUpgrade) + if err != nil { + return plan, err + } + step := newMigrationStep(current, isUpgrade, changes) + plan = append(plan, step) + current = step.target + } + return plan, nil +} + +func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { + tx.Lock() + defer tx.Unlock() + return p.unsafeExecute(lg, tx) +} + +func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) (err error) { + for _, s := range p { + err = s.unsafeExecute(lg, tx) + if err != nil { + return err + } + lg.Info("upgraded storage version", zap.String("new-storage-version", s.target.String())) + } + return nil +} + +// migrationStep represents a single migrationStep of migrating etcd storage between two minor versions. +type migrationStep struct { + target semver.Version + actions ActionList +} + +func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) (step migrationStep) { + step.actions = make(ActionList, len(changes)) + for i, change := range changes { + if isUpgrade { + step.actions[i] = change.upgradeAction() + } else { + step.actions[len(changes)-1-i] = change.downgradeAction() + } + } + if isUpgrade { + step.target = semver.Version{Major: v.Major, Minor: v.Minor + 1} + } else { + step.target = semver.Version{Major: v.Major, Minor: v.Minor - 1} + } + return step +} + +// execute runs actions required to migrate etcd storage between two minor versions. +func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { + tx.Lock() + defer tx.Unlock() + return s.unsafeExecute(lg, tx) +} + +// unsafeExecute is non thread-safe version of execute. +func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error { + err := s.actions.unsafeExecute(lg, tx) + if err != nil { + return err + } + // Storage version is available since v3.6, downgrading target v3.5 should clean this field. + if !s.target.LessThan(V3_6) { + UnsafeSetStorageVersion(tx, &s.target) + } + return nil +} diff --git a/server/storage/schema/migration_test.go b/server/storage/schema/migration_test.go new file mode 100644 index 000000000..24175b310 --- /dev/null +++ b/server/storage/schema/migration_test.go @@ -0,0 +1,179 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "testing" + "time" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/storage/backend" + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "go.uber.org/zap/zaptest" +) + +func TestMigrationStepExecute(t *testing.T) { + recorder := &actionRecorder{} + errorC := fmt.Errorf("error C") + tcs := []struct { + name string + + currentVersion semver.Version + isUpgrade bool + changes []schemaChange + + expectError error + expectVersion *semver.Version + expectRecordedActions []string + }{ + { + name: "Upgrade execute changes in order and updates version", + currentVersion: semver.Version{Major: 99, Minor: 0}, + isUpgrade: true, + changes: []schemaChange{ + recorder.changeMock("A"), + recorder.changeMock("B"), + }, + + expectVersion: &semver.Version{Major: 99, Minor: 1}, + expectRecordedActions: []string{"upgrade A", "upgrade B"}, + }, + { + name: "Downgrade execute changes in reversed order and downgrades version", + currentVersion: semver.Version{Major: 99, Minor: 1}, + isUpgrade: false, + changes: []schemaChange{ + recorder.changeMock("A"), + recorder.changeMock("B"), + }, + + expectVersion: &semver.Version{Major: 99, Minor: 0}, + expectRecordedActions: []string{"downgrade B", "downgrade A"}, + }, + { + name: "Failure during upgrade should revert previous changes in reversed order and not change version", + currentVersion: semver.Version{Major: 99, Minor: 0}, + isUpgrade: true, + changes: []schemaChange{ + recorder.changeMock("A"), + recorder.changeMock("B"), + recorder.changeError(errorC), + recorder.changeMock("D"), + recorder.changeMock("E"), + }, + + expectVersion: &semver.Version{Major: 99, Minor: 0}, + expectRecordedActions: []string{"upgrade A", "upgrade B", "upgrade error C", "revert upgrade B", "revert upgrade A"}, + expectError: errorC, + }, + { + name: "Failure during downgrade should revert previous changes in reversed order and not change version", + currentVersion: semver.Version{Major: 99, Minor: 0}, + isUpgrade: false, + changes: []schemaChange{ + recorder.changeMock("A"), + recorder.changeMock("B"), + recorder.changeError(errorC), + recorder.changeMock("D"), + recorder.changeMock("E"), + }, + + expectVersion: &semver.Version{Major: 99, Minor: 0}, + expectRecordedActions: []string{"downgrade E", "downgrade D", "downgrade error C", "revert downgrade D", "revert downgrade E"}, + expectError: errorC, + }, + { + name: "Downgrade below to below v3.6 doesn't leave storage version as it was not supported then", + currentVersion: semver.Version{Major: 3, Minor: 6}, + changes: schemaChanges[V3_6], + isUpgrade: false, + expectVersion: nil, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + recorder.actions = []string{} + if tc.expectRecordedActions == nil { + tc.expectRecordedActions = []string{} + } + lg := zaptest.NewLogger(t) + + be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + defer be.Close() + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + defer tx.Unlock() + + UnsafeCreateMetaBucket(tx) + UnsafeSetStorageVersion(tx, &tc.currentVersion) + + step := newMigrationStep(tc.currentVersion, tc.isUpgrade, tc.changes) + err := step.unsafeExecute(lg, tx) + if err != tc.expectError { + t.Errorf("Unexpected error or lack thereof, expected: %v, got: %v", tc.expectError, err) + } + v := UnsafeReadStorageVersion(tx) + assert.Equal(t, tc.expectVersion, v) + assert.Equal(t, tc.expectRecordedActions, recorder.actions) + }) + } +} + +type actionRecorder struct { + actions []string +} + +func (r *actionRecorder) changeMock(name string) schemaChange { + return changeMock(r, name, nil) +} + +func (r *actionRecorder) changeError(err error) schemaChange { + return changeMock(r, fmt.Sprintf("%v", err), err) +} + +func changeMock(recorder *actionRecorder, name string, err error) schemaChange { + return simpleSchemaChange{ + upgrade: actionMock{ + recorder: recorder, + name: "upgrade " + name, + err: err, + }, + downgrade: actionMock{ + recorder: recorder, + name: "downgrade " + name, + err: err, + }, + } +} + +type actionMock struct { + recorder *actionRecorder + name string + err error +} + +func (a actionMock) unsafeDo(tx backend.BatchTx) (action, error) { + a.recorder.actions = append(a.recorder.actions, a.name) + return actionMock{ + recorder: a.recorder, + name: "revert " + a.name, + }, a.err +} diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 7750e1155..e498e8285 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -18,49 +18,92 @@ import ( "fmt" "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/api/v3/version" "go.uber.org/zap" "go.etcd.io/etcd/server/v3/storage/backend" ) var ( - V3_5 = semver.Version{Major: 3, Minor: 5} - V3_6 = semver.Version{Major: 3, Minor: 6} + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} + currentVersion semver.Version ) -// UpdateStorageSchema updates storage version. +func init() { + v := semver.New(version.Version) + currentVersion = semver.Version{Major: v.Major, Minor: v.Minor} +} + +// UpdateStorageSchema updates storage schema to etcd binary version. func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error { + return Migrate(lg, tx, currentVersion) +} + +// Migrate updates storage schema to provided target version. +func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { tx.Lock() defer tx.Unlock() - v, err := DetectSchemaVersion(lg, tx) + current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { return fmt.Errorf("cannot determine storage version: %w", err) } - switch *v { - case V3_5: - lg.Warn("setting storage version", zap.String("storage-version", V3_6.String())) - // All meta keys introduced in v3.6 should be filled in here. - UnsafeSetStorageVersion(tx, &V3_6) - case V3_6: - default: - lg.Warn("unknown storage version", zap.String("storage-version", v.String())) + plan, err := newPlan(lg, current, target) + if err != nil { + return fmt.Errorf("cannot create migration plan: %w", err) } - return nil + return plan.unsafeExecute(lg, tx) } -func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) { - v := UnsafeReadStorageVersion(tx) - if v != nil { - return v, nil +// DetectSchemaVersion returns version of storage schema. Returned value depends on etcd version that created the backend. For +// * v3.6 and newer will return storage version. +// * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). +// * v3.4 and older is not supported and will return error. +func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { + tx.Lock() + defer tx.Unlock() + return UnsafeDetectSchemaVersion(lg, tx) +} + +// UnsafeDetectSchemaVersion non thread safe version of DetectSchemaVersion. +func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { + vp := UnsafeReadStorageVersion(tx) + if vp != nil { + return *vp, nil } confstate := UnsafeConfStateFromBackend(lg, tx) if confstate == nil { - return nil, fmt.Errorf("missing confstate information") + return v, fmt.Errorf("missing confstate information") } _, term := UnsafeReadConsistentIndex(tx) if term == 0 { - return nil, fmt.Errorf("missing term information") + return v, fmt.Errorf("missing term information") } - copied := V3_5 - return &copied, nil + return V3_5, nil } + +func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) { + // changes should be taken from higher version + if isUpgrade { + v = semver.Version{Major: v.Major, Minor: v.Minor + 1} + } + + actions, found := schemaChanges[v] + if !found { + return nil, fmt.Errorf("version %q is not supported", v.String()) + } + return actions, nil +} + +var ( + // schemaChanges list changes that were introduced in perticular version. + // schema was introduced in v3.6 as so its changes were not tracked before. + schemaChanges = map[semver.Version][]schemaChange{ + V3_6: { + addNewField(Meta, MetaStorageVersionName, emptyStorageVersion), + }, + } + // emptyStorageVersion is used for v3.6 Step for the first time, in all other version StoragetVersion should be set by migrator. + // Adding a addNewField for StorageVersion we can reuselogic to remove it when downgrading to v3.5 + emptyStorageVersion = []byte("") +) diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index f34abe0aa..c919751c2 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -15,80 +15,111 @@ package schema import ( + "fmt" "testing" "time" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" "go.uber.org/zap" ) -func TestUpdateStorageVersion(t *testing.T) { +var ( + V3_7 = semver.Version{Major: 3, Minor: 7} +) + +func TestMigrate(t *testing.T) { tcs := []struct { - name string - version string - setupKeys func(tx backend.BatchTx) - expectVersion *semver.Version - expectError bool - expectedErrorMsg string + name string + version semver.Version + // Overrides which keys should be set (default based on version) + overrideKeys func(tx backend.BatchTx) + targetVersion semver.Version + + expectVersion *semver.Version + expectError bool + expectErrorMsg string }{ + // As storage version field was added in v3.6, for v3.5 we will not set it. + // For storage to be considered v3.5 it have both confstate and term key set. { - name: `Backend before 3.6 without confstate should be rejected`, - version: "", - expectVersion: nil, - setupKeys: func(tx backend.BatchTx) {}, - expectError: true, - expectedErrorMsg: `cannot determine storage version: missing confstate information`, + name: `Upgrading v3.5 to v3.6 should be rejected if confstate is not set`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) {}, + targetVersion: V3_6, + expectVersion: nil, + expectError: true, + expectErrorMsg: `cannot determine storage version: missing confstate information`, }, { - name: `Backend before 3.6 without term should be rejected`, - version: "", - setupKeys: func(tx backend.BatchTx) { + name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) { MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) }, - expectVersion: nil, - expectError: true, - expectedErrorMsg: `cannot determine storage version: missing term information`, + targetVersion: V3_6, + expectVersion: nil, + expectError: true, + expectErrorMsg: `cannot determine storage version: missing term information`, }, { - name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", - version: "", - setupKeys: func(tx backend.BatchTx) { - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - }, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`, + version: V3_5, + targetVersion: V3_6, + expectVersion: &V3_6, }, { - name: "Backend in 3.6.0 should be skipped", - version: "3.6.0", - setupKeys: func(tx backend.BatchTx) { - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - }, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: `Migrate on same v3.5 version passes and doesn't set storage version'`, + version: V3_5, + targetVersion: V3_5, + expectVersion: nil, }, { - name: "Backend with current version should be skipped", - version: version.Version, - setupKeys: func(tx backend.BatchTx) { - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - }, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: `Migrate on same v3.6 version passes`, + version: V3_6, + targetVersion: V3_6, + expectVersion: &V3_6, }, { - name: "Backend in 3.7.0 should be skipped", - version: "3.7.0", - setupKeys: func(tx backend.BatchTx) { - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - }, - expectVersion: &semver.Version{Major: 3, Minor: 7}, + name: `Migrate on same v3.7 version passes`, + version: V3_7, + targetVersion: V3_7, + expectVersion: &V3_7, + }, + { + name: "Upgrading 3.6 to v3.7 is not supported", + version: V3_6, + targetVersion: V3_7, + expectVersion: &V3_6, + expectError: true, + expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`, + }, + { + name: "Downgrading v3.7 to v3.6 is not supported", + version: V3_7, + targetVersion: V3_6, + expectVersion: &V3_7, + expectError: true, + expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + }, + { + name: "Downgrading v3.6 to v3.5 is not supported", + version: V3_6, + targetVersion: V3_5, + expectVersion: &V3_6, + expectError: true, + expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + }, + { + name: "Downgrading v3.5 to v3.4 is not supported", + version: V3_5, + targetVersion: V3_4, + expectVersion: nil, + expectError: true, + expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, }, } for _, tc := range tcs { @@ -101,9 +132,10 @@ func TestUpdateStorageVersion(t *testing.T) { } tx.Lock() UnsafeCreateMetaBucket(tx) - tc.setupKeys(tx) - if tc.version != "" { - UnsafeSetStorageVersion(tx, semver.New(tc.version)) + if tc.overrideKeys != nil { + tc.overrideKeys(tx) + } else { + setupKeys(t, tx, tc.version) } tx.Unlock() be.ForceCommit() @@ -111,15 +143,113 @@ func TestUpdateStorageVersion(t *testing.T) { b := backend.NewDefaultBackend(tmpPath) defer b.Close() - err := UpdateStorageSchema(lg, b.BatchTx()) + err := Migrate(lg, b.BatchTx(), tc.targetVersion) if (err != nil) != tc.expectError { - t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError) + t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError) } - if err != nil && err.Error() != tc.expectedErrorMsg { - t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg) + if err != nil && err.Error() != tc.expectErrorMsg { + t.Errorf("Migrate(lg, tx, %q) = %q, expected error message: %q", tc.targetVersion, err, tc.expectErrorMsg) } v := UnsafeReadStorageVersion(b.BatchTx()) assert.Equal(t, tc.expectVersion, v) }) } } + +func TestMigrateIsReversible(t *testing.T) { + tcs := []struct { + initialVersion semver.Version + state map[string]string + }{ + { + initialVersion: V3_5, + state: map[string]string{ + "confState": `{"auto_leave":false}`, + "consistent_index": "\x00\x00\x00\x00\x00\x00\x00\x01", + "term": "\x00\x00\x00\x00\x00\x00\x00\x01", + }, + }, + { + initialVersion: V3_6, + state: map[string]string{ + "confState": `{"auto_leave":false}`, + "consistent_index": "\x00\x00\x00\x00\x00\x00\x00\x01", + "term": "\x00\x00\x00\x00\x00\x00\x00\x01", + "storageVersion": "3.6.0", + }, + }, + } + for _, tc := range tcs { + t.Run(tc.initialVersion.String(), func(t *testing.T) { + lg := zap.NewNop() + be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + defer be.Close() + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + UnsafeCreateMetaBucket(tx) + setupKeys(t, tx, tc.initialVersion) + assertBucketState(t, tx, Meta, tc.state) + tx.Unlock() + + // Upgrade to current version + tx.Lock() + err := testUnsafeMigrate(lg, be.BatchTx(), currentVersion) + if err != nil { + t.Errorf("Migrate(lg, tx, %q) returned error %+v", currentVersion, err) + } + assert.Equal(t, ¤tVersion, UnsafeReadStorageVersion(tx)) + tx.Unlock() + + // Downgrade back to initial version + tx.Lock() + err = testUnsafeMigrate(lg, be.BatchTx(), tc.initialVersion) + if err != nil { + t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) + } + tx.Unlock() + + // Assert that all changes were revered + tx.Lock() + assertBucketState(t, tx, Meta, tc.state) + tx.Unlock() + }) + } +} + +// Does the same as UnsafeMigrate but skips version checks +// TODO(serathius): Use UnsafeMigrate when downgrades are implemented +func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { + current, err := UnsafeDetectSchemaVersion(lg, tx) + if err != nil { + return fmt.Errorf("cannot determine storage version: %w", err) + } + plan, err := buildPlan(current, target) + if err != nil { + return fmt.Errorf("cannot create migration plan: %w", err) + } + return plan.unsafeExecute(lg, tx) +} + +func setupKeys(t *testing.T, tx backend.BatchTx, ver semver.Version) { + t.Helper() + switch ver { + case V3_4: + case V3_5: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + case V3_6: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_6) + case V3_7: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_7) + tx.UnsafePut(Meta, []byte("future-key"), []byte("")) + default: + t.Fatalf("Unsupported storage version") + } +} diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index dee79a0ab..e81cf2623 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -70,10 +70,10 @@ func TestEtctlutlMigrate(t *testing.T) { expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`, }, { - name: "Upgrade v3.5 to v3.6 should fail until it's implemented", - binary: lastReleaseBinary, - targetVersion: "3.6", - expectLogsSubString: "Error: storage version migration is not yet supported", + name: "Upgrade v3.5 to v3.6 should work", + binary: lastReleaseBinary, + targetVersion: "3.6", + expectStorageVersion: &schema.V3_6, }, { name: "Migrate v3.6 to v3.6 is no-op", @@ -84,7 +84,7 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Downgrade v3.6 to v3.5 should fail until it's implemented", targetVersion: "3.5", - expectLogsSubString: "Error: storage version migration is not yet supported", + expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported", expectStorageVersion: &schema.V3_6, }, {