server: Implement storage schema migration to follow cluster version change and panic if unknown storage version is found

Storage version should follow cluster version. During upgrades this
should be immidiate as storage version can be always upgraded as storage
is backward compatible. During downgrades it will be delayed and will
require time for incompatible changes to be snapshotted.

As storage version change can happen long after cluster is running, we
need to add a step during bootstrap to validate if loaded data can be
understood by migrator.
This commit is contained in:
Marek Siarkowicz
2021-08-18 17:43:58 +02:00
parent 0d15ff57e6
commit ff3729c4d5
9 changed files with 319 additions and 80 deletions

View File

@@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/
See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
### Breaking Changes
- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.
### etcdctl v3
- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).

View File

@@ -18,6 +18,8 @@ import (
"context"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/version"
@@ -28,6 +30,14 @@ import (
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
tx backend.BatchTx
}
func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter {
return &serverVersionAdapter{
EtcdServer: s,
tx: nil,
}
}
var _ serverversion.Server = (*serverVersionAdapter)(nil)
@@ -56,3 +66,36 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
}
func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx)
if err != nil {
return nil
}
return &v
}
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
err := schema.UnsafeMigrate(s.lg, s.tx, target)
if err != nil {
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
}
}
func (s *serverVersionAdapter) Lock() {
s.tx = s.be.BatchTx()
s.tx.Lock()
}
func (s *serverVersionAdapter) Unlock() {
s.tx.Unlock()
s.tx = nil
}

View File

@@ -147,16 +147,29 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
ci = cindex.NewConsistentIndex(nil)
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
be = serverstorage.OpenBackend(cfg, beHooks)
defer func() {
if err != nil && be != nil {
be.Close()
}
}()
ci.SetBackend(be)
schema.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)
err = maybeDefragBackend(cfg, be)
if err != nil {
be.Close()
return nil, nil, false, nil, err
}
}
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
// TODO(serathius): Implement schema setup in fresh storage
if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx())
if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, nil, false, nil, err
}
}
return be, ci, beExist, beHooks, nil
}

View File

@@ -291,9 +291,6 @@ type EtcdServer struct {
firstCommitInTerm *notify.Notifier
*AccessController
// Ensure that storage schema is updated only once.
updateStorageSchema sync.Once
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@@ -516,7 +513,8 @@ func (s *EtcdServer) Start() {
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorVersions)
s.GoAttach(s.monitorClusterVersions)
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorDowngrade)
@@ -2071,12 +2069,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
s.updateStorageSchema.Do(func() {
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
})
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
@@ -2137,9 +2129,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
return s.cluster.Version()
}
// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
func (s *EtcdServer) monitorVersions() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
// monitorClusterVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
func (s *EtcdServer) monitorClusterVersions() {
monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s))
for {
select {
case <-s.firstCommitInTerm.Receive():
@@ -2155,6 +2147,19 @@ func (s *EtcdServer) monitorVersions() {
}
}
// monitorStorageVersion every monitorVersionInterval updates storage version if needed.
func (s *EtcdServer) monitorStorageVersion() {
monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s))
for {
select {
case <-time.After(monitorVersionInterval):
case <-s.stopping:
return
}
monitor.UpdateStorageVersionIfNeeded()
}
}
func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()
@@ -2233,7 +2238,7 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) {
// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
func (s *EtcdServer) monitorDowngrade() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s))
t := s.Cfg.DowngradeCheckTime
if t == 0 {
return

View File

@@ -34,6 +34,12 @@ type Server interface {
GetVersions() map[string]*version.Versions
UpdateClusterVersion(string)
DowngradeCancel()
GetStorageVersion() *semver.Version
UpdateStorageVersion(semver.Version)
Lock()
Unlock()
}
func NewMonitor(lg *zap.Logger, storage Server) *Monitor {
@@ -73,6 +79,24 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() {
}
}
// UpdateStorageVersionIfNeeded updates the storage version if it differs from cluster version.
func (m *Monitor) UpdateStorageVersionIfNeeded() {
cv := m.s.GetClusterVersion()
if cv == nil {
return
}
m.s.Lock()
defer m.s.Unlock()
sv := m.s.GetStorageVersion()
if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor {
if sv != nil {
m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String()))
}
m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor})
}
}
func (m *Monitor) CancelDowngradeIfNeeded() {
d := m.s.GetDowngradeInfo()
if !d.Enabled {

View File

@@ -13,6 +13,11 @@ import (
var testLogger = zap.NewExample()
var (
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)
func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
@@ -52,6 +57,55 @@ func TestDecideClusterVersion(t *testing.T) {
}
}
func TestDecideStorageVersion(t *testing.T) {
tests := []struct {
name string
clusterVersion *semver.Version
storageVersion *semver.Version
expectStorageVersion *semver.Version
}{
{
name: "No action if cluster version is nil",
},
{
name: "Should set storage version if cluster version is set",
clusterVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "No action if storage version was already set",
storageVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "No action if storage version equals cluster version",
clusterVersion: &V3_5,
storageVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "Should set storage version to cluster version",
clusterVersion: &V3_6,
storageVersion: &V3_5,
expectStorageVersion: &V3_6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &storageMock{
clusterVersion: tt.clusterVersion,
storageVersion: tt.storageVersion,
}
monitor := NewMonitor(testLogger, s)
monitor.UpdateStorageVersionIfNeeded()
if !reflect.DeepEqual(s.storageVersion, tt.expectStorageVersion) {
t.Errorf("Unexpected storage version value, got = %+v, want %+v", s.storageVersion, tt.expectStorageVersion)
}
})
}
}
func TestVersionMatchTarget(t *testing.T) {
tests := []struct {
name string
@@ -107,7 +161,9 @@ func TestVersionMatchTarget(t *testing.T) {
type storageMock struct {
versions map[string]*version.Versions
clusterVersion *semver.Version
storageVersion *semver.Version
downgradeInfo *membership.DowngradeInfo
locked bool
}
var _ Server = (*storageMock)(nil)
@@ -131,3 +187,22 @@ func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *storageMock) GetVersions() map[string]*version.Versions {
return s.versions
}
func (s *storageMock) GetStorageVersion() *semver.Version {
return s.storageVersion
}
func (s *storageMock) UpdateStorageVersion(v semver.Version) {
s.storageVersion = &v
}
func (s *storageMock) Lock() {
if s.locked {
panic("Deadlock")
}
s.locked = true
}
func (s *storageMock) Unlock() {
s.locked = false
}

View File

@@ -88,8 +88,9 @@ var (
func DefaultIgnores(bucket, key []byte) bool {
// consistent index & term might be changed due to v2 internal sync, which
// is not controllable by the user.
// storage version might change after wal snapshot and is not controller by user.
return bytes.Compare(bucket, Meta.Name()) == 0 &&
(bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0)
(bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0 || bytes.Compare(key, MetaStorageVersionName) == 0)
}
func BackendMemberKey(id types.ID) []byte {

View File

@@ -25,28 +25,45 @@ import (
)
var (
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
currentVersion semver.Version
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)
func init() {
v := semver.New(version.Version)
currentVersion = semver.Version{Major: v.Major, Minor: v.Minor}
// Validate checks provided backend to confirm that schema used is supported.
func Validate(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
return unsafeValidate(lg, tx)
}
// UpdateStorageSchema updates storage schema to etcd binary version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
return Migrate(lg, tx, currentVersion)
func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
// v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version.
lg.Warn("Failed to detect storage schema version. Please wait till wal snapshot before upgrading cluster.")
return nil
}
_, err = newPlan(lg, current, localBinaryVersion())
return err
}
func localBinaryVersion() semver.Version {
v := semver.New(version.Version)
return semver.Version{Major: v.Major, Minor: v.Minor}
}
// Migrate updates storage schema to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
tx.Lock()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, target)
}
// UnsafeMigrate is non-threadsafe version of Migrate.
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
return fmt.Errorf("cannot detect storage schema version: %w", err)
}
plan, err := newPlan(lg, current, target)
if err != nil {
@@ -65,7 +82,7 @@ func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, e
return UnsafeDetectSchemaVersion(lg, tx)
}
// UnsafeDetectSchemaVersion non thread safe version of DetectSchemaVersion.
// UnsafeDetectSchemaVersion non-threadsafe version of DetectSchemaVersion.
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
vp := UnsafeReadStorageVersion(tx)
if vp != nil {

View File

@@ -31,6 +31,70 @@ var (
V3_7 = semver.Version{Major: 3, Minor: 7}
)
func TestValidate(t *testing.T) {
tcs := []struct {
name string
version semver.Version
// Overrides which keys should be set (default based on version)
overrideKeys func(tx backend.BatchTx)
expectError bool
expectErrorMsg string
}{
// As storage version field was added in v3.6, for v3.5 we will not set it.
// For storage to be considered v3.5 it have both confstate and term key set.
{
name: `V3.4 schema is correct`,
version: V3_4,
},
{
name: `V3.5 schema without confstate and term fields is correct`,
version: V3_5,
overrideKeys: func(tx backend.BatchTx) {},
},
{
name: `V3.5 schema without term field is correct`,
version: V3_5,
overrideKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
},
},
{
name: `V3.5 schema with all fields is correct`,
version: V3_5,
overrideKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
},
{
name: `V3.6 schema is correct`,
version: V3_6,
},
{
name: `V3.7 schema is unknown and should return error`,
version: V3_7,
expectError: true,
expectErrorMsg: "downgrades are not yet supported",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)
b := backend.NewDefaultBackend(dataPath)
defer b.Close()
err := Validate(lg, b.BatchTx())
if (err != nil) != tc.expectError {
t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError)
}
if err != nil && err.Error() != tc.expectErrorMsg {
t.Errorf("Validate(lg, tx) = %q, expected error message: %q", err, tc.expectErrorMsg)
}
})
}
}
func TestMigrate(t *testing.T) {
tcs := []struct {
name string
@@ -52,7 +116,7 @@ func TestMigrate(t *testing.T) {
targetVersion: V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing confstate information`,
expectErrorMsg: `cannot detect storage schema version: missing confstate information`,
},
{
name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`,
@@ -63,7 +127,7 @@ func TestMigrate(t *testing.T) {
targetVersion: V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing term information`,
expectErrorMsg: `cannot detect storage schema version: missing term information`,
},
{
name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`,
@@ -125,23 +189,9 @@ func TestMigrate(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
if tc.overrideKeys != nil {
tc.overrideKeys(tx)
} else {
setupKeys(t, tx, tc.version)
}
tx.Unlock()
be.ForceCommit()
be.Close()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)
b := backend.NewDefaultBackend(tmpPath)
b := backend.NewDefaultBackend(dataPath)
defer b.Close()
err := Migrate(lg, b.BatchTx(), tc.targetVersion)
if (err != nil) != tc.expectError {
@@ -182,39 +232,31 @@ func TestMigrateIsReversible(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.initialVersion.String(), func(t *testing.T) {
lg := zap.NewNop()
be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10)
dataPath := setupBackendData(t, tc.initialVersion, nil)
be := backend.NewDefaultBackend(dataPath)
defer be.Close()
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
setupKeys(t, tx, tc.initialVersion)
defer tx.Unlock()
assertBucketState(t, tx, Meta, tc.state)
tx.Unlock()
// Upgrade to current version
tx.Lock()
err := testUnsafeMigrate(lg, be.BatchTx(), currentVersion)
ver := localBinaryVersion()
err := testUnsafeMigrate(lg, tx, ver)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", currentVersion, err)
t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err)
}
assert.Equal(t, &currentVersion, UnsafeReadStorageVersion(tx))
tx.Unlock()
assert.Equal(t, &ver, UnsafeReadStorageVersion(tx))
// Downgrade back to initial version
tx.Lock()
err = testUnsafeMigrate(lg, be.BatchTx(), tc.initialVersion)
err = testUnsafeMigrate(lg, tx, tc.initialVersion)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err)
}
tx.Unlock()
// Assert that all changes were revered
tx.Lock()
assertBucketState(t, tx, Meta, tc.state)
tx.Unlock()
})
}
}
@@ -233,23 +275,38 @@ func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version
return plan.unsafeExecute(lg, tx)
}
func setupKeys(t *testing.T, tx backend.BatchTx, ver semver.Version) {
func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string {
t.Helper()
switch ver {
case V3_4:
case V3_5:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
case V3_6:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_7:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_7)
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
default:
t.Fatalf("Unsupported storage version")
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
if overrideKeys != nil {
overrideKeys(tx)
} else {
switch version {
case V3_4:
case V3_5:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
case V3_6:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_7:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeSetStorageVersion(tx, &V3_7)
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
default:
t.Fatalf("Unsupported storage version")
}
}
tx.Unlock()
be.ForceCommit()
be.Close()
return tmpPath
}