From c65c1ea5594a69cb402053bcece7c9c7a1c8608f Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 8 Feb 2024 13:03:25 -0800 Subject: [PATCH] Add function to migrate 3.5 data online. Signed-off-by: Siyuan Zhang --- etcdserver/backend.go | 2 +- etcdserver/server.go | 2 +- mvcc/downgrade.go | 128 +++++++++++++++++++++++++++++++++++++++++ mvcc/downgrade_test.go | 95 ++++++++++++++++++++++++++++++ mvcc/kvstore.go | 17 +++++- mvcc/kvstore_test.go | 10 +++- version/version.go | 3 + 7 files changed, 253 insertions(+), 4 deletions(-) create mode 100644 mvcc/downgrade.go create mode 100644 mvcc/downgrade_test.go diff --git a/etcdserver/backend.go b/etcdserver/backend.go index aa443cac8..6da17e0c4 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -103,7 +103,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { var cIndex consistentIndex - kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit, NextClusterVersionCompatible: cfg.NextClusterVersionCompatible}) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/server.go b/etcdserver/server.go index 7dbc56a6a..5902bf6c2 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -578,7 +578,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit, NextClusterVersionCompatible: cfg.NextClusterVersionCompatible}) if beExist { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade diff --git a/mvcc/downgrade.go b/mvcc/downgrade.go new file mode 100644 index 000000000..a57ee2ff6 --- /dev/null +++ b/mvcc/downgrade.go @@ -0,0 +1,128 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "encoding/binary" + "encoding/json" + + "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/version" + "go.uber.org/zap" +) + +var ( + // meta bucket keys since 3.5 + confStateKeyName = []byte("confState") + termKeyName = []byte("term") + // meta bucket keys since 3.6 + storageVersionKeyName = []byte("storageVersion") +) + +// unsafeDowngradeMetaBucket delete 3.5 specific keys to make backend fully compatible with 3.4 +func unsafeDowngradeMetaBucket(lg *zap.Logger, tx backend.BatchTx) { + if lg != nil { + lg.Info( + "downgrade meta bucket", + zap.ByteStrings("remove-keys", [][]byte{confStateKeyName, termKeyName}), + ) + } else { + plog.Printf("downgrade meta bucket: remove keys [%s, %s]", string(confStateKeyName), string(termKeyName)) + } + tx.UnsafeDelete(metaBucketName, confStateKeyName) + tx.UnsafeDelete(metaBucketName, termKeyName) +} + +// unsafeReadStorageVersion loads storage version from given backend transaction. +// Populated since v3.6 +func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version { + _, vs := tx.UnsafeRange(metaBucketName, storageVersionKeyName, nil, 0) + if len(vs) == 0 { + return nil + } + v, err := semver.NewVersion(string(vs[0])) + if err != nil { + return nil + } + return v +} + +// unsafeReadTerm loads term from given transaction. +// returns 0 if the data are not found. +// Term is persisted since v3.5. +func unsafeReadTerm(tx backend.ReadTx) uint64 { + _, ts := tx.UnsafeRange(metaBucketName, termKeyName, nil, 0) + if len(ts) == 0 { + return 0 + } + return binary.BigEndian.Uint64(ts[0]) +} + +// unsafeConfStateFromBackend retrieves ConfState from the backend. +// Returns nil if confState in backend is not persisted (e.g. backend writen by 0 { + b.tx.rangeRespc <- rangeResp{[][]byte{storageVersionKeyName}, [][]byte{[]byte(tt.storageVersion)}} + } else { + b.tx.rangeRespc <- rangeResp{nil, nil} + } + if tt.confState != nil { + confStateBytes, err := json.Marshal(tt.confState) + if err != nil { + t.Fatalf("Cannot marshal raftpb.ConfState, conf-state = %s, err = %s\n", tt.confState.String(), err) + } + b.tx.rangeRespc <- rangeResp{[][]byte{confStateKeyName}, [][]byte{confStateBytes}} + } else { + b.tx.rangeRespc <- rangeResp{nil, nil} + } + if tt.term > 0 { + bs2 := make([]byte, 8) + binary.BigEndian.PutUint64(bs2, tt.term) + b.tx.rangeRespc <- rangeResp{[][]byte{termKeyName}, [][]byte{bs2}} + } else { + b.tx.rangeRespc <- rangeResp{nil, nil} + } + + v := UnsafeDetectSchemaVersion(s.lg, b.BatchTx()) + expectedVersion := semver.Must(semver.NewVersion(tt.expectedVersion)) + if !v.Equal(*expectedVersion) { + t.Fatalf("want version: %s, got: %s", expectedVersion, v) + } + } +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 6dc4f30bc..19d0825dd 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" "go.etcd.io/etcd/pkg/traceutil" + "go.etcd.io/etcd/version" "github.com/coreos/pkg/capnslog" "go.uber.org/zap" @@ -71,7 +72,8 @@ type ConsistentIndexGetter interface { } type StoreConfig struct { - CompactionBatchLimit int + CompactionBatchLimit int + NextClusterVersionCompatible bool } type store struct { @@ -379,6 +381,19 @@ func (s *store) restore() error { tx := s.b.BatchTx() tx.Lock() + v := UnsafeDetectSchemaVersion(s.lg, tx) + if !v.Equal(version.V3_4) && !v.Equal(version.V3_5) { + if s.lg != nil { + s.lg.Panic("unsupported storage version", + zap.String("storage-version", v.String())) + } else { + plog.Panicf("unsupported storage version: %s\n", v.String()) + } + } + if s.cfg.NextClusterVersionCompatible && v.Equal(version.V3_5) { + unsafeDowngradeMetaBucket(s.lg, tx) + } + _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { s.revMu.Lock() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f776b7fb6..35d40c2b9 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -382,6 +382,11 @@ func TestStoreRestore(t *testing.T) { if err != nil { t.Fatal(err) } + + b.tx.rangeRespc <- rangeResp{nil, nil} + b.tx.rangeRespc <- rangeResp{nil, nil} + b.tx.rangeRespc <- rangeResp{nil, nil} + b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} @@ -397,6 +402,9 @@ func TestStoreRestore(t *testing.T) { t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ + {Name: "range", Params: []interface{}{metaBucketName, storageVersionKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{metaBucketName, confStateKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{metaBucketName, termKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, @@ -903,7 +911,7 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { func newFakeStore() *store { b := &fakeBackend{&fakeBatchTx{ Recorder: &testutil.RecorderBuffered{}, - rangeRespc: make(chan rangeResp, 5)}} + rangeRespc: make(chan rangeResp, 8)}} fi := &fakeIndex{ Recorder: &testutil.RecorderBuffered{}, indexGetRespc: make(chan indexGetResp, 1), diff --git a/version/version.go b/version/version.go index 555003a1b..c5839a2cb 100644 --- a/version/version.go +++ b/version/version.go @@ -31,6 +31,9 @@ var ( // Git SHA Value will be set during build GitSHA = "Not provided (use ./build instead of go build)" + + V3_4 = semver.Version{Major: 3, Minor: 4} + V3_5 = semver.Version{Major: 3, Minor: 5} ) func init() {