server: Implement schema migrations

This commit is contained in:
Marek Siarkowicz 2021-08-18 17:36:30 +02:00
parent 9d81dde082
commit 0d15ff57e6
10 changed files with 940 additions and 94 deletions

View File

@ -103,29 +103,30 @@ func migrateCommandFunc(c *migrateConfig) error {
defer c.be.Close() defer c.be.Close()
lg := GetLogger() lg := GetLogger()
tx := c.be.BatchTx() tx := c.be.BatchTx()
tx.Lock()
current, err := schema.DetectSchemaVersion(lg, tx) current, err := schema.DetectSchemaVersion(lg, tx)
if err != nil { if err != nil {
tx.Unlock()
lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
return err return err
} }
if *current == *c.targetVersion { if current == *c.targetVersion {
tx.Unlock() lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(current)))
return nil return nil
} }
if c.force { err = schema.Migrate(lg, tx, *c.targetVersion)
unsafeMigrateForce(lg, tx, c.targetVersion) if err != nil {
tx.Unlock() if !c.force {
return err
}
lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(lg, tx, c.targetVersion)
}
c.be.ForceCommit() c.be.ForceCommit()
return nil return nil
} }
tx.Unlock()
return fmt.Errorf("storage version migration is not yet supported")
}
func unsafeMigrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.Lock()
defer tx.Unlock()
// Storage version is only supported since v3.6 // Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) { if target.LessThan(schema.V3_6) {
schema.UnsafeClearStorageVersion(tx) schema.UnsafeClearStorageVersion(tx)

View File

@ -0,0 +1,93 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.uber.org/zap"
)
type action interface {
// unsafeDo executes the action and returns revert action, when executed
// should restore the state from before.
unsafeDo(tx backend.BatchTx) (revert action, err error)
}
type setKeyAction struct {
Bucket backend.Bucket
FieldName []byte
FieldValue []byte
}
func (a setKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
tx.UnsafePut(a.Bucket, a.FieldName, a.FieldValue)
return revert, nil
}
type deleteKeyAction struct {
Bucket backend.Bucket
FieldName []byte
}
func (a deleteKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
tx.UnsafeDelete(a.Bucket, a.FieldName)
return revert, nil
}
func restoreFieldValueAction(tx backend.BatchTx, bucket backend.Bucket, fieldName []byte) action {
_, vs := tx.UnsafeRange(bucket, fieldName, nil, 1)
if len(vs) == 1 {
return &setKeyAction{
Bucket: bucket,
FieldName: fieldName,
FieldValue: vs[0],
}
}
return &deleteKeyAction{
Bucket: bucket,
FieldName: fieldName,
}
}
type ActionList []action
// unsafeExecute executes actions one by one. If one of actions returns error,
// it will revert them.
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
var revertActions = make(ActionList, 0, len(as))
for _, a := range as {
revert, err := a.unsafeDo(tx)
if err != nil {
revertActions.unsafeExecuteInReverseOrder(lg, tx)
return err
}
revertActions = append(revertActions, revert)
}
return nil
}
// unsafeExecuteInReverseOrder executes actions in revered order. Will panic on
// action error. Should be used when reverting.
func (as ActionList) unsafeExecuteInReverseOrder(lg *zap.Logger, tx backend.BatchTx) {
for j := len(as) - 1; j >= 0; j-- {
_, err := as[j].unsafeDo(tx)
if err != nil {
lg.Panic("Cannot recover from revert error", zap.Error(err))
}
}
}

View File

@ -0,0 +1,170 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap/zaptest"
)
func TestActionIsReversible(t *testing.T) {
tcs := []struct {
name string
action action
state map[string]string
}{
{
name: "setKeyAction empty state",
action: setKeyAction{
Bucket: Meta,
FieldName: []byte("/test"),
FieldValue: []byte("1"),
},
},
{
name: "setKeyAction with key",
action: setKeyAction{
Bucket: Meta,
FieldName: []byte("/test"),
FieldValue: []byte("1"),
},
state: map[string]string{"/test": "2"},
},
{
name: "deleteKeyAction empty state",
action: deleteKeyAction{
Bucket: Meta,
FieldName: []byte("/test"),
},
},
{
name: "deleteKeyAction with key",
action: deleteKeyAction{
Bucket: Meta,
FieldName: []byte("/test"),
},
state: map[string]string{"/test": "2"},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
defer tx.Unlock()
UnsafeCreateMetaBucket(tx)
putKeyValues(tx, Meta, tc.state)
assertBucketState(t, tx, Meta, tc.state)
reverse, err := tc.action.unsafeDo(tx)
if err != nil {
t.Errorf("Failed to upgrade, err: %v", err)
}
_, err = reverse.unsafeDo(tx)
if err != nil {
t.Errorf("Failed to downgrade, err: %v", err)
}
assertBucketState(t, tx, Meta, tc.state)
})
}
}
func TestActionListRevert(t *testing.T) {
tcs := []struct {
name string
actions ActionList
expectState map[string]string
expectError error
}{
{
name: "Apply multiple actions",
actions: ActionList{
setKeyAction{Meta, []byte("/testKey1"), []byte("testValue1")},
setKeyAction{Meta, []byte("/testKey2"), []byte("testValue2")},
},
expectState: map[string]string{"/testKey1": "testValue1", "/testKey2": "testValue2"},
},
{
name: "Broken action should result in changes reverted",
actions: ActionList{
setKeyAction{Meta, []byte("/testKey1"), []byte("testValue1")},
brokenAction{},
setKeyAction{Meta, []byte("/testKey2"), []byte("testValue2")},
},
expectState: map[string]string{},
expectError: errBrokenAction,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
defer tx.Unlock()
UnsafeCreateMetaBucket(tx)
err := tc.actions.unsafeExecute(lg, tx)
if err != tc.expectError {
t.Errorf("Unexpected error or lack thereof, expected: %v, got: %v", tc.expectError, err)
}
assertBucketState(t, tx, Meta, tc.expectState)
})
}
}
type brokenAction struct{}
var errBrokenAction = fmt.Errorf("broken action error")
func (c brokenAction) unsafeDo(tx backend.BatchTx) (action, error) {
return nil, errBrokenAction
}
func putKeyValues(tx backend.BatchTx, bucket backend.Bucket, kvs map[string]string) {
for k, v := range kvs {
tx.UnsafePut(bucket, []byte(k), []byte(v))
}
}
func assertBucketState(t *testing.T, tx backend.BatchTx, bucket backend.Bucket, expect map[string]string) {
t.Helper()
got := map[string]string{}
ks, vs := tx.UnsafeRange(bucket, []byte("\x00"), []byte("\xff"), 0)
for i := 0; i < len(ks); i++ {
got[string(ks[i])] = string(vs[i])
}
if expect == nil {
expect = map[string]string{}
}
assert.Equal(t, expect, got)
}

View File

@ -0,0 +1,50 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import "go.etcd.io/etcd/server/v3/storage/backend"
type schemaChange interface {
upgradeAction() action
downgradeAction() action
}
// addNewField represents adding new field when upgrading. Downgrade will remove the field.
func addNewField(bucket backend.Bucket, fieldName []byte, fieldValue []byte) schemaChange {
return simpleSchemaChange{
upgrade: setKeyAction{
Bucket: bucket,
FieldName: fieldName,
FieldValue: fieldValue,
},
downgrade: deleteKeyAction{
Bucket: bucket,
FieldName: fieldName,
},
}
}
type simpleSchemaChange struct {
upgrade action
downgrade action
}
func (c simpleSchemaChange) upgradeAction() action {
return c.upgrade
}
func (c simpleSchemaChange) downgradeAction() action {
return c.downgrade
}

View File

@ -0,0 +1,61 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"testing"
"time"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
func TestUpgradeDowngrade(t *testing.T) {
tcs := []struct {
name string
change schemaChange
expectStateAfterUpgrade map[string]string
expectStateAfterDowngrade map[string]string
}{
{
name: "addNewField empty",
change: addNewField(Meta, []byte("/test"), []byte("1")),
expectStateAfterUpgrade: map[string]string{"/test": "1"},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
defer tx.Unlock()
UnsafeCreateMetaBucket(tx)
_, err := tc.change.upgradeAction().unsafeDo(tx)
if err != nil {
t.Errorf("Failed to upgrade, err: %v", err)
}
assertBucketState(t, tx, Meta, tc.expectStateAfterUpgrade)
_, err = tc.change.downgradeAction().unsafeDo(tx)
if err != nil {
t.Errorf("Failed to downgrade, err: %v", err)
}
assertBucketState(t, tx, Meta, tc.expectStateAfterDowngrade)
})
}
}

View File

@ -0,0 +1,119 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"fmt"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.uber.org/zap"
)
type migrationPlan []migrationStep
func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) {
if current.Major != target.Major {
lg.Error("Changing major storage version is not supported",
zap.String("storage-version", current.String()),
zap.String("target-storage-version", target.String()),
)
return nil, fmt.Errorf("Changing major storage version is not supported")
}
// TODO(serathius): Implement downgrades
if current.Minor > target.Minor {
lg.Error("Target version is lower than the current version, downgrades are not yet supported",
zap.String("storage-version", current.String()),
zap.String("target-storage-version", target.String()),
)
return nil, fmt.Errorf("downgrades are not yet supported")
}
return buildPlan(current, target)
}
func buildPlan(current semver.Version, target semver.Version) (plan migrationPlan, err error) {
for current.Minor != target.Minor {
isUpgrade := current.Minor < target.Minor
changes, err := schemaChangesForVersion(current, isUpgrade)
if err != nil {
return plan, err
}
step := newMigrationStep(current, isUpgrade, changes)
plan = append(plan, step)
current = step.target
}
return plan, nil
}
func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
return p.unsafeExecute(lg, tx)
}
func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) (err error) {
for _, s := range p {
err = s.unsafeExecute(lg, tx)
if err != nil {
return err
}
lg.Info("upgraded storage version", zap.String("new-storage-version", s.target.String()))
}
return nil
}
// migrationStep represents a single migrationStep of migrating etcd storage between two minor versions.
type migrationStep struct {
target semver.Version
actions ActionList
}
func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) (step migrationStep) {
step.actions = make(ActionList, len(changes))
for i, change := range changes {
if isUpgrade {
step.actions[i] = change.upgradeAction()
} else {
step.actions[len(changes)-1-i] = change.downgradeAction()
}
}
if isUpgrade {
step.target = semver.Version{Major: v.Major, Minor: v.Minor + 1}
} else {
step.target = semver.Version{Major: v.Major, Minor: v.Minor - 1}
}
return step
}
// execute runs actions required to migrate etcd storage between two minor versions.
func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
return s.unsafeExecute(lg, tx)
}
// unsafeExecute is non thread-safe version of execute.
func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
err := s.actions.unsafeExecute(lg, tx)
if err != nil {
return err
}
// Storage version is available since v3.6, downgrading target v3.5 should clean this field.
if !s.target.LessThan(V3_6) {
UnsafeSetStorageVersion(tx, &s.target)
}
return nil
}

View File

@ -0,0 +1,179 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"fmt"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap/zaptest"
)
func TestMigrationStepExecute(t *testing.T) {
recorder := &actionRecorder{}
errorC := fmt.Errorf("error C")
tcs := []struct {
name string
currentVersion semver.Version
isUpgrade bool
changes []schemaChange
expectError error
expectVersion *semver.Version
expectRecordedActions []string
}{
{
name: "Upgrade execute changes in order and updates version",
currentVersion: semver.Version{Major: 99, Minor: 0},
isUpgrade: true,
changes: []schemaChange{
recorder.changeMock("A"),
recorder.changeMock("B"),
},
expectVersion: &semver.Version{Major: 99, Minor: 1},
expectRecordedActions: []string{"upgrade A", "upgrade B"},
},
{
name: "Downgrade execute changes in reversed order and downgrades version",
currentVersion: semver.Version{Major: 99, Minor: 1},
isUpgrade: false,
changes: []schemaChange{
recorder.changeMock("A"),
recorder.changeMock("B"),
},
expectVersion: &semver.Version{Major: 99, Minor: 0},
expectRecordedActions: []string{"downgrade B", "downgrade A"},
},
{
name: "Failure during upgrade should revert previous changes in reversed order and not change version",
currentVersion: semver.Version{Major: 99, Minor: 0},
isUpgrade: true,
changes: []schemaChange{
recorder.changeMock("A"),
recorder.changeMock("B"),
recorder.changeError(errorC),
recorder.changeMock("D"),
recorder.changeMock("E"),
},
expectVersion: &semver.Version{Major: 99, Minor: 0},
expectRecordedActions: []string{"upgrade A", "upgrade B", "upgrade error C", "revert upgrade B", "revert upgrade A"},
expectError: errorC,
},
{
name: "Failure during downgrade should revert previous changes in reversed order and not change version",
currentVersion: semver.Version{Major: 99, Minor: 0},
isUpgrade: false,
changes: []schemaChange{
recorder.changeMock("A"),
recorder.changeMock("B"),
recorder.changeError(errorC),
recorder.changeMock("D"),
recorder.changeMock("E"),
},
expectVersion: &semver.Version{Major: 99, Minor: 0},
expectRecordedActions: []string{"downgrade E", "downgrade D", "downgrade error C", "revert downgrade D", "revert downgrade E"},
expectError: errorC,
},
{
name: "Downgrade below to below v3.6 doesn't leave storage version as it was not supported then",
currentVersion: semver.Version{Major: 3, Minor: 6},
changes: schemaChanges[V3_6],
isUpgrade: false,
expectVersion: nil,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
recorder.actions = []string{}
if tc.expectRecordedActions == nil {
tc.expectRecordedActions = []string{}
}
lg := zaptest.NewLogger(t)
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
defer tx.Unlock()
UnsafeCreateMetaBucket(tx)
UnsafeSetStorageVersion(tx, &tc.currentVersion)
step := newMigrationStep(tc.currentVersion, tc.isUpgrade, tc.changes)
err := step.unsafeExecute(lg, tx)
if err != tc.expectError {
t.Errorf("Unexpected error or lack thereof, expected: %v, got: %v", tc.expectError, err)
}
v := UnsafeReadStorageVersion(tx)
assert.Equal(t, tc.expectVersion, v)
assert.Equal(t, tc.expectRecordedActions, recorder.actions)
})
}
}
type actionRecorder struct {
actions []string
}
func (r *actionRecorder) changeMock(name string) schemaChange {
return changeMock(r, name, nil)
}
func (r *actionRecorder) changeError(err error) schemaChange {
return changeMock(r, fmt.Sprintf("%v", err), err)
}
func changeMock(recorder *actionRecorder, name string, err error) schemaChange {
return simpleSchemaChange{
upgrade: actionMock{
recorder: recorder,
name: "upgrade " + name,
err: err,
},
downgrade: actionMock{
recorder: recorder,
name: "downgrade " + name,
err: err,
},
}
}
type actionMock struct {
recorder *actionRecorder
name string
err error
}
func (a actionMock) unsafeDo(tx backend.BatchTx) (action, error) {
a.recorder.actions = append(a.recorder.actions, a.name)
return actionMock{
recorder: a.recorder,
name: "revert " + a.name,
}, a.err
}

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.uber.org/zap" "go.uber.org/zap"
"go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/backend"
@ -26,41 +27,83 @@ import (
var ( var (
V3_5 = semver.Version{Major: 3, Minor: 5} V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6} V3_6 = semver.Version{Major: 3, Minor: 6}
currentVersion semver.Version
) )
// UpdateStorageSchema updates storage version. func init() {
v := semver.New(version.Version)
currentVersion = semver.Version{Major: v.Major, Minor: v.Minor}
}
// UpdateStorageSchema updates storage schema to etcd binary version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error { func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
return Migrate(lg, tx, currentVersion)
}
// Migrate updates storage schema to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
v, err := DetectSchemaVersion(lg, tx) current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil { if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err) return fmt.Errorf("cannot determine storage version: %w", err)
} }
switch *v { plan, err := newPlan(lg, current, target)
case V3_5: if err != nil {
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String())) return fmt.Errorf("cannot create migration plan: %w", err)
// All meta keys introduced in v3.6 should be filled in here.
UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
} }
return nil return plan.unsafeExecute(lg, tx)
} }
func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) { // DetectSchemaVersion returns version of storage schema. Returned value depends on etcd version that created the backend. For
v := UnsafeReadStorageVersion(tx) // * v3.6 and newer will return storage version.
if v != nil { // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot).
return v, nil // * v3.4 and older is not supported and will return error.
func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
tx.Lock()
defer tx.Unlock()
return UnsafeDetectSchemaVersion(lg, tx)
}
// UnsafeDetectSchemaVersion non thread safe version of DetectSchemaVersion.
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
vp := UnsafeReadStorageVersion(tx)
if vp != nil {
return *vp, nil
} }
confstate := UnsafeConfStateFromBackend(lg, tx) confstate := UnsafeConfStateFromBackend(lg, tx)
if confstate == nil { if confstate == nil {
return nil, fmt.Errorf("missing confstate information") return v, fmt.Errorf("missing confstate information")
} }
_, term := UnsafeReadConsistentIndex(tx) _, term := UnsafeReadConsistentIndex(tx)
if term == 0 { if term == 0 {
return nil, fmt.Errorf("missing term information") return v, fmt.Errorf("missing term information")
} }
copied := V3_5 return V3_5, nil
return &copied, nil
} }
func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) {
// changes should be taken from higher version
if isUpgrade {
v = semver.Version{Major: v.Major, Minor: v.Minor + 1}
}
actions, found := schemaChanges[v]
if !found {
return nil, fmt.Errorf("version %q is not supported", v.String())
}
return actions, nil
}
var (
// schemaChanges list changes that were introduced in perticular version.
// schema was introduced in v3.6 as so its changes were not tracked before.
schemaChanges = map[semver.Version][]schemaChange{
V3_6: {
addNewField(Meta, MetaStorageVersionName, emptyStorageVersion),
},
}
// emptyStorageVersion is used for v3.6 Step for the first time, in all other version StoragetVersion should be set by migrator.
// Adding a addNewField for StorageVersion we can reuselogic to remove it when downgrading to v3.5
emptyStorageVersion = []byte("")
)

View File

@ -15,80 +15,111 @@
package schema package schema
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap" "go.uber.org/zap"
) )
func TestUpdateStorageVersion(t *testing.T) { var (
V3_7 = semver.Version{Major: 3, Minor: 7}
)
func TestMigrate(t *testing.T) {
tcs := []struct { tcs := []struct {
name string name string
version string version semver.Version
setupKeys func(tx backend.BatchTx) // Overrides which keys should be set (default based on version)
overrideKeys func(tx backend.BatchTx)
targetVersion semver.Version
expectVersion *semver.Version expectVersion *semver.Version
expectError bool expectError bool
expectedErrorMsg string expectErrorMsg string
}{ }{
// As storage version field was added in v3.6, for v3.5 we will not set it.
// For storage to be considered v3.5 it have both confstate and term key set.
{ {
name: `Backend before 3.6 without confstate should be rejected`, name: `Upgrading v3.5 to v3.6 should be rejected if confstate is not set`,
version: "", version: V3_5,
expectVersion: nil, overrideKeys: func(tx backend.BatchTx) {},
setupKeys: func(tx backend.BatchTx) {}, targetVersion: V3_6,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing confstate information`,
},
{
name: `Backend before 3.6 without term should be rejected`,
version: "",
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
},
expectVersion: nil, expectVersion: nil,
expectError: true, expectError: true,
expectedErrorMsg: `cannot determine storage version: missing term information`, expectErrorMsg: `cannot determine storage version: missing confstate information`,
}, },
{ {
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`,
version: "", version: V3_5,
setupKeys: func(tx backend.BatchTx) { overrideKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
}, },
expectVersion: &semver.Version{Major: 3, Minor: 6}, targetVersion: V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing term information`,
}, },
{ {
name: "Backend in 3.6.0 should be skipped", name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`,
version: "3.6.0", version: V3_5,
setupKeys: func(tx backend.BatchTx) { targetVersion: V3_6,
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) expectVersion: &V3_6,
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 6},
}, },
{ {
name: "Backend with current version should be skipped", name: `Migrate on same v3.5 version passes and doesn't set storage version'`,
version: version.Version, version: V3_5,
setupKeys: func(tx backend.BatchTx) { targetVersion: V3_5,
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) expectVersion: nil,
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 6},
}, },
{ {
name: "Backend in 3.7.0 should be skipped", name: `Migrate on same v3.6 version passes`,
version: "3.7.0", version: V3_6,
setupKeys: func(tx backend.BatchTx) { targetVersion: V3_6,
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) expectVersion: &V3_6,
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
}, },
expectVersion: &semver.Version{Major: 3, Minor: 7}, {
name: `Migrate on same v3.7 version passes`,
version: V3_7,
targetVersion: V3_7,
expectVersion: &V3_7,
},
{
name: "Upgrading 3.6 to v3.7 is not supported",
version: V3_6,
targetVersion: V3_7,
expectVersion: &V3_6,
expectError: true,
expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`,
},
{
name: "Downgrading v3.7 to v3.6 is not supported",
version: V3_7,
targetVersion: V3_6,
expectVersion: &V3_7,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
},
{
name: "Downgrading v3.6 to v3.5 is not supported",
version: V3_6,
targetVersion: V3_5,
expectVersion: &V3_6,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
},
{
name: "Downgrading v3.5 to v3.4 is not supported",
version: V3_5,
targetVersion: V3_4,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -101,9 +132,10 @@ func TestUpdateStorageVersion(t *testing.T) {
} }
tx.Lock() tx.Lock()
UnsafeCreateMetaBucket(tx) UnsafeCreateMetaBucket(tx)
tc.setupKeys(tx) if tc.overrideKeys != nil {
if tc.version != "" { tc.overrideKeys(tx)
UnsafeSetStorageVersion(tx, semver.New(tc.version)) } else {
setupKeys(t, tx, tc.version)
} }
tx.Unlock() tx.Unlock()
be.ForceCommit() be.ForceCommit()
@ -111,15 +143,113 @@ func TestUpdateStorageVersion(t *testing.T) {
b := backend.NewDefaultBackend(tmpPath) b := backend.NewDefaultBackend(tmpPath)
defer b.Close() defer b.Close()
err := UpdateStorageSchema(lg, b.BatchTx()) err := Migrate(lg, b.BatchTx(), tc.targetVersion)
if (err != nil) != tc.expectError { if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError) t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError)
} }
if err != nil && err.Error() != tc.expectedErrorMsg { if err != nil && err.Error() != tc.expectErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg) t.Errorf("Migrate(lg, tx, %q) = %q, expected error message: %q", tc.targetVersion, err, tc.expectErrorMsg)
} }
v := UnsafeReadStorageVersion(b.BatchTx()) v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v) assert.Equal(t, tc.expectVersion, v)
}) })
} }
} }
func TestMigrateIsReversible(t *testing.T) {
tcs := []struct {
initialVersion semver.Version
state map[string]string
}{
{
initialVersion: V3_5,
state: map[string]string{
"confState": `{"auto_leave":false}`,
"consistent_index": "\x00\x00\x00\x00\x00\x00\x00\x01",
"term": "\x00\x00\x00\x00\x00\x00\x00\x01",
},
},
{
initialVersion: V3_6,
state: map[string]string{
"confState": `{"auto_leave":false}`,
"consistent_index": "\x00\x00\x00\x00\x00\x00\x00\x01",
"term": "\x00\x00\x00\x00\x00\x00\x00\x01",
"storageVersion": "3.6.0",
},
},
}
for _, tc := range tcs {
t.Run(tc.initialVersion.String(), func(t *testing.T) {
lg := zap.NewNop()
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
setupKeys(t, tx, tc.initialVersion)
assertBucketState(t, tx, Meta, tc.state)
tx.Unlock()
// Upgrade to current version
tx.Lock()
err := testUnsafeMigrate(lg, be.BatchTx(), currentVersion)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", currentVersion, err)
}
assert.Equal(t, &currentVersion, UnsafeReadStorageVersion(tx))
tx.Unlock()
// Downgrade back to initial version
tx.Lock()
err = testUnsafeMigrate(lg, be.BatchTx(), tc.initialVersion)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err)
}
tx.Unlock()
// Assert that all changes were revered
tx.Lock()
assertBucketState(t, tx, Meta, tc.state)
tx.Unlock()
})
}
}
// Does the same as UnsafeMigrate but skips version checks
// TODO(serathius): Use UnsafeMigrate when downgrades are implemented
func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
plan, err := buildPlan(current, target)
if err != nil {
return fmt.Errorf("cannot create migration plan: %w", err)
}
return plan.unsafeExecute(lg, tx)
}
func setupKeys(t *testing.T, tx backend.BatchTx, ver semver.Version) {
t.Helper()
switch ver {
case V3_4:
case V3_5:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
case V3_6:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_7:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_7)
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
default:
t.Fatalf("Unsupported storage version")
}
}

View File

@ -70,10 +70,10 @@ func TestEtctlutlMigrate(t *testing.T) {
expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`, expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`,
}, },
{ {
name: "Upgrade v3.5 to v3.6 should fail until it's implemented", name: "Upgrade v3.5 to v3.6 should work",
binary: lastReleaseBinary, binary: lastReleaseBinary,
targetVersion: "3.6", targetVersion: "3.6",
expectLogsSubString: "Error: storage version migration is not yet supported", expectStorageVersion: &schema.V3_6,
}, },
{ {
name: "Migrate v3.6 to v3.6 is no-op", name: "Migrate v3.6 to v3.6 is no-op",
@ -84,7 +84,7 @@ func TestEtctlutlMigrate(t *testing.T) {
{ {
name: "Downgrade v3.6 to v3.5 should fail until it's implemented", name: "Downgrade v3.6 to v3.5 should fail until it's implemented",
targetVersion: "3.5", targetVersion: "3.5",
expectLogsSubString: "Error: storage version migration is not yet supported", expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported",
expectStorageVersion: &schema.V3_6, expectStorageVersion: &schema.V3_6,
}, },
{ {