diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md index 1094c2663..360f25f49 100644 --- a/CHANGELOG-3.6.md +++ b/CHANGELOG-3.6.md @@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). +### Breaking Changes + +- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command. + ### etcdctl v3 - Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133). diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index b00e721f7..a6611359c 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,6 +18,8 @@ import ( "context" "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" "go.etcd.io/etcd/api/v3/version" @@ -28,6 +30,14 @@ import ( // serverVersionAdapter implements Server interface needed by serverversion.Monitor type serverVersionAdapter struct { *EtcdServer + tx backend.BatchTx +} + +func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter { + return &serverVersionAdapter{ + EtcdServer: s, + tx: nil, + } } var _ serverversion.Server = (*serverVersionAdapter)(nil) @@ -56,3 +66,36 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions { return getVersions(s.lg, s.cluster, s.id, s.peerRt) } + +func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { + if s.tx == nil { + s.Lock() + defer s.Unlock() + } + v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx) + if err != nil { + return nil + } + return &v +} + +func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) { + if s.tx == nil { + s.Lock() + defer s.Unlock() + } + err := schema.UnsafeMigrate(s.lg, s.tx, target) + if err != nil { + s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err)) + } +} + +func (s *serverVersionAdapter) Lock() { + s.tx = s.be.BatchTx() + s.tx.Lock() +} + +func (s *serverVersionAdapter) Unlock() { + s.tx.Unlock() + s.tx = nil +} diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 9a31f659c..da9cc6da0 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -147,16 +147,29 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co ci = cindex.NewConsistentIndex(nil) beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) be = serverstorage.OpenBackend(cfg, beHooks) + defer func() { + if err != nil && be != nil { + be.Close() + } + }() ci.SetBackend(be) schema.CreateMetaBucket(be.BatchTx()) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { - err := maybeDefragBackend(cfg, be) + err = maybeDefragBackend(cfg, be) if err != nil { - be.Close() return nil, nil, false, nil, err } } cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) + + // TODO(serathius): Implement schema setup in fresh storage + if beExist { + err = schema.Validate(cfg.Logger, be.BatchTx()) + if err != nil { + cfg.Logger.Error("Failed to validate schema", zap.Error(err)) + return nil, nil, false, nil, err + } + } return be, ci, beExist, beHooks, nil } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5db8f31ce..c491b8225 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -291,9 +291,6 @@ type EtcdServer struct { firstCommitInTerm *notify.Notifier *AccessController - - // Ensure that storage schema is updated only once. - updateStorageSchema sync.Once } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -516,7 +513,8 @@ func (s *EtcdServer) Start() { s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.GoAttach(s.purgeFile) s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) }) - s.GoAttach(s.monitorVersions) + s.GoAttach(s.monitorClusterVersions) + s.GoAttach(s.monitorStorageVersion) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) s.GoAttach(s.monitorDowngrade) @@ -2071,12 +2069,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) - s.updateStorageSchema.Do(func() { - err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx()) - if err != nil { - s.lg.Warn("failed to update storage version", zap.Error(err)) - } - }) // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after @@ -2137,9 +2129,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { return s.cluster.Version() } -// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed. -func (s *EtcdServer) monitorVersions() { - monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) +// monitorClusterVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed. +func (s *EtcdServer) monitorClusterVersions() { + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) for { select { case <-s.firstCommitInTerm.Receive(): @@ -2155,6 +2147,19 @@ func (s *EtcdServer) monitorVersions() { } } +// monitorStorageVersion every monitorVersionInterval updates storage version if needed. +func (s *EtcdServer) monitorStorageVersion() { + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) + for { + select { + case <-time.After(monitorVersionInterval): + case <-s.stopping: + return + } + monitor.UpdateStorageVersionIfNeeded() + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() @@ -2233,7 +2238,7 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) { // monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed. func (s *EtcdServer) monitorDowngrade() { - monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) t := s.Cfg.DowngradeCheckTime if t == 0 { return diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 0db7be625..19e91f7ef 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -34,6 +34,12 @@ type Server interface { GetVersions() map[string]*version.Versions UpdateClusterVersion(string) DowngradeCancel() + + GetStorageVersion() *semver.Version + UpdateStorageVersion(semver.Version) + + Lock() + Unlock() } func NewMonitor(lg *zap.Logger, storage Server) *Monitor { @@ -73,6 +79,24 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() { } } +// UpdateStorageVersionIfNeeded updates the storage version if it differs from cluster version. +func (m *Monitor) UpdateStorageVersionIfNeeded() { + cv := m.s.GetClusterVersion() + if cv == nil { + return + } + m.s.Lock() + defer m.s.Unlock() + sv := m.s.GetStorageVersion() + + if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor { + if sv != nil { + m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String())) + } + m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + } +} + func (m *Monitor) CancelDowngradeIfNeeded() { d := m.s.GetDowngradeInfo() if !d.Enabled { diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 1a712ef08..b76915b11 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -13,6 +13,11 @@ import ( var testLogger = zap.NewExample() +var ( + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} +) + func TestDecideClusterVersion(t *testing.T) { tests := []struct { vers map[string]*version.Versions @@ -52,6 +57,55 @@ func TestDecideClusterVersion(t *testing.T) { } } +func TestDecideStorageVersion(t *testing.T) { + tests := []struct { + name string + clusterVersion *semver.Version + storageVersion *semver.Version + expectStorageVersion *semver.Version + }{ + { + name: "No action if cluster version is nil", + }, + { + name: "Should set storage version if cluster version is set", + clusterVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version was already set", + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version equals cluster version", + clusterVersion: &V3_5, + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "Should set storage version to cluster version", + clusterVersion: &V3_6, + storageVersion: &V3_5, + expectStorageVersion: &V3_6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &storageMock{ + clusterVersion: tt.clusterVersion, + storageVersion: tt.storageVersion, + } + monitor := NewMonitor(testLogger, s) + monitor.UpdateStorageVersionIfNeeded() + if !reflect.DeepEqual(s.storageVersion, tt.expectStorageVersion) { + t.Errorf("Unexpected storage version value, got = %+v, want %+v", s.storageVersion, tt.expectStorageVersion) + } + }) + } +} + func TestVersionMatchTarget(t *testing.T) { tests := []struct { name string @@ -107,7 +161,9 @@ func TestVersionMatchTarget(t *testing.T) { type storageMock struct { versions map[string]*version.Versions clusterVersion *semver.Version + storageVersion *semver.Version downgradeInfo *membership.DowngradeInfo + locked bool } var _ Server = (*storageMock)(nil) @@ -131,3 +187,22 @@ func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo { func (s *storageMock) GetVersions() map[string]*version.Versions { return s.versions } + +func (s *storageMock) GetStorageVersion() *semver.Version { + return s.storageVersion +} + +func (s *storageMock) UpdateStorageVersion(v semver.Version) { + s.storageVersion = &v +} + +func (s *storageMock) Lock() { + if s.locked { + panic("Deadlock") + } + s.locked = true +} + +func (s *storageMock) Unlock() { + s.locked = false +} diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index c39ed71e0..e5eda721b 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -88,8 +88,9 @@ var ( func DefaultIgnores(bucket, key []byte) bool { // consistent index & term might be changed due to v2 internal sync, which // is not controllable by the user. + // storage version might change after wal snapshot and is not controller by user. return bytes.Compare(bucket, Meta.Name()) == 0 && - (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0) + (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0 || bytes.Compare(key, MetaStorageVersionName) == 0) } func BackendMemberKey(id types.ID) []byte { diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index e498e8285..0fbdb7bb9 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -25,28 +25,45 @@ import ( ) var ( - V3_5 = semver.Version{Major: 3, Minor: 5} - V3_6 = semver.Version{Major: 3, Minor: 6} - currentVersion semver.Version + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} ) -func init() { - v := semver.New(version.Version) - currentVersion = semver.Version{Major: v.Major, Minor: v.Minor} +// Validate checks provided backend to confirm that schema used is supported. +func Validate(lg *zap.Logger, tx backend.BatchTx) error { + tx.Lock() + defer tx.Unlock() + return unsafeValidate(lg, tx) } -// UpdateStorageSchema updates storage schema to etcd binary version. -func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error { - return Migrate(lg, tx, currentVersion) +func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error { + current, err := UnsafeDetectSchemaVersion(lg, tx) + if err != nil { + // v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version. + lg.Warn("Failed to detect storage schema version. Please wait till wal snapshot before upgrading cluster.") + return nil + } + _, err = newPlan(lg, current, localBinaryVersion()) + return err +} + +func localBinaryVersion() semver.Version { + v := semver.New(version.Version) + return semver.Version{Major: v.Major, Minor: v.Minor} } // Migrate updates storage schema to provided target version. func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { tx.Lock() defer tx.Unlock() + return UnsafeMigrate(lg, tx, target) +} + +// UnsafeMigrate is non-threadsafe version of Migrate. +func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { - return fmt.Errorf("cannot determine storage version: %w", err) + return fmt.Errorf("cannot detect storage schema version: %w", err) } plan, err := newPlan(lg, current, target) if err != nil { @@ -65,7 +82,7 @@ func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, e return UnsafeDetectSchemaVersion(lg, tx) } -// UnsafeDetectSchemaVersion non thread safe version of DetectSchemaVersion. +// UnsafeDetectSchemaVersion non-threadsafe version of DetectSchemaVersion. func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { vp := UnsafeReadStorageVersion(tx) if vp != nil { diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index c919751c2..bb4a0e3e9 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -31,6 +31,70 @@ var ( V3_7 = semver.Version{Major: 3, Minor: 7} ) +func TestValidate(t *testing.T) { + tcs := []struct { + name string + version semver.Version + // Overrides which keys should be set (default based on version) + overrideKeys func(tx backend.BatchTx) + expectError bool + expectErrorMsg string + }{ + // As storage version field was added in v3.6, for v3.5 we will not set it. + // For storage to be considered v3.5 it have both confstate and term key set. + { + name: `V3.4 schema is correct`, + version: V3_4, + }, + { + name: `V3.5 schema without confstate and term fields is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) {}, + }, + { + name: `V3.5 schema without term field is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) { + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + }, + }, + { + name: `V3.5 schema with all fields is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) { + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + }, + }, + { + name: `V3.6 schema is correct`, + version: V3_6, + }, + { + name: `V3.7 schema is unknown and should return error`, + version: V3_7, + expectError: true, + expectErrorMsg: "downgrades are not yet supported", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dataPath := setupBackendData(t, tc.version, tc.overrideKeys) + + b := backend.NewDefaultBackend(dataPath) + defer b.Close() + err := Validate(lg, b.BatchTx()) + if (err != nil) != tc.expectError { + t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError) + } + if err != nil && err.Error() != tc.expectErrorMsg { + t.Errorf("Validate(lg, tx) = %q, expected error message: %q", err, tc.expectErrorMsg) + } + }) + } +} + func TestMigrate(t *testing.T) { tcs := []struct { name string @@ -52,7 +116,7 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot determine storage version: missing confstate information`, + expectErrorMsg: `cannot detect storage schema version: missing confstate information`, }, { name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`, @@ -63,7 +127,7 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot determine storage version: missing term information`, + expectErrorMsg: `cannot detect storage schema version: missing term information`, }, { name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`, @@ -125,23 +189,9 @@ func TestMigrate(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { lg := zap.NewNop() - be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } - tx.Lock() - UnsafeCreateMetaBucket(tx) - if tc.overrideKeys != nil { - tc.overrideKeys(tx) - } else { - setupKeys(t, tx, tc.version) - } - tx.Unlock() - be.ForceCommit() - be.Close() + dataPath := setupBackendData(t, tc.version, tc.overrideKeys) - b := backend.NewDefaultBackend(tmpPath) + b := backend.NewDefaultBackend(dataPath) defer b.Close() err := Migrate(lg, b.BatchTx(), tc.targetVersion) if (err != nil) != tc.expectError { @@ -182,39 +232,31 @@ func TestMigrateIsReversible(t *testing.T) { for _, tc := range tcs { t.Run(tc.initialVersion.String(), func(t *testing.T) { lg := zap.NewNop() - be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + dataPath := setupBackendData(t, tc.initialVersion, nil) + + be := backend.NewDefaultBackend(dataPath) defer be.Close() tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } tx.Lock() - UnsafeCreateMetaBucket(tx) - setupKeys(t, tx, tc.initialVersion) + defer tx.Unlock() assertBucketState(t, tx, Meta, tc.state) - tx.Unlock() // Upgrade to current version - tx.Lock() - err := testUnsafeMigrate(lg, be.BatchTx(), currentVersion) + ver := localBinaryVersion() + err := testUnsafeMigrate(lg, tx, ver) if err != nil { - t.Errorf("Migrate(lg, tx, %q) returned error %+v", currentVersion, err) + t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err) } - assert.Equal(t, ¤tVersion, UnsafeReadStorageVersion(tx)) - tx.Unlock() + assert.Equal(t, &ver, UnsafeReadStorageVersion(tx)) // Downgrade back to initial version - tx.Lock() - err = testUnsafeMigrate(lg, be.BatchTx(), tc.initialVersion) + err = testUnsafeMigrate(lg, tx, tc.initialVersion) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) } - tx.Unlock() // Assert that all changes were revered - tx.Lock() assertBucketState(t, tx, Meta, tc.state) - tx.Unlock() }) } } @@ -233,23 +275,38 @@ func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version return plan.unsafeExecute(lg, tx) } -func setupKeys(t *testing.T, tx backend.BatchTx, ver semver.Version) { +func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string { t.Helper() - switch ver { - case V3_4: - case V3_5: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - case V3_6: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - UnsafeSetStorageVersion(tx, &V3_6) - case V3_7: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - UnsafeSetStorageVersion(tx, &V3_7) - tx.UnsafePut(Meta, []byte("future-key"), []byte("")) - default: - t.Fatalf("Unsupported storage version") + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") } + tx.Lock() + UnsafeCreateMetaBucket(tx) + if overrideKeys != nil { + overrideKeys(tx) + } else { + switch version { + case V3_4: + case V3_5: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + case V3_6: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_6) + case V3_7: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_7) + tx.UnsafePut(Meta, []byte("future-key"), []byte("")) + default: + t.Fatalf("Unsupported storage version") + } + } + tx.Unlock() + be.ForceCommit() + be.Close() + return tmpPath }