Add function to migrate 3.5 data online.

Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
Siyuan Zhang 2024-02-08 13:03:25 -08:00
parent e7da7ebf7e
commit c65c1ea559
7 changed files with 253 additions and 4 deletions

View File

@ -103,7 +103,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit, NextClusterVersionCompatible: cfg.NextClusterVersionCompatible})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil

View File

@ -578,7 +578,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit, NextClusterVersionCompatible: cfg.NextClusterVersionCompatible})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade

128
mvcc/downgrade.go Normal file
View File

@ -0,0 +1,128 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"encoding/binary"
"encoding/json"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/version"
"go.uber.org/zap"
)
var (
// meta bucket keys since 3.5
confStateKeyName = []byte("confState")
termKeyName = []byte("term")
// meta bucket keys since 3.6
storageVersionKeyName = []byte("storageVersion")
)
// unsafeDowngradeMetaBucket delete 3.5 specific keys to make backend fully compatible with 3.4
func unsafeDowngradeMetaBucket(lg *zap.Logger, tx backend.BatchTx) {
if lg != nil {
lg.Info(
"downgrade meta bucket",
zap.ByteStrings("remove-keys", [][]byte{confStateKeyName, termKeyName}),
)
} else {
plog.Printf("downgrade meta bucket: remove keys [%s, %s]", string(confStateKeyName), string(termKeyName))
}
tx.UnsafeDelete(metaBucketName, confStateKeyName)
tx.UnsafeDelete(metaBucketName, termKeyName)
}
// unsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(metaBucketName, storageVersionKeyName, nil, 0)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}
// unsafeReadTerm loads term from given transaction.
// returns 0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadTerm(tx backend.ReadTx) uint64 {
_, ts := tx.UnsafeRange(metaBucketName, termKeyName, nil, 0)
if len(ts) == 0 {
return 0
}
return binary.BigEndian.Uint64(ts[0])
}
// unsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func unsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(metaBucketName, confStateKeyName, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
if lg != nil {
lg.Panic(
"unexpected number of key: "+string(confStateKeyName)+" when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
} else {
plog.Panicf("unexpected number of key: %s when getting cluster version from backend, number-of-key=%d",
string(confStateKeyName), len(keys))
}
}
var confState raftpb.ConfState
if err := json.Unmarshal(vals[0], &confState); err != nil {
if lg != nil {
lg.Panic(
"cannot unmarshal confState json retrieved from the backend",
zap.ByteString("conf-state-json", vals[0]),
zap.Error(err),
)
} else {
plog.Panicf("cannot unmarshal confState json retrieved from the backend, err: %s", err)
}
}
return &confState
}
// UnsafeDetectSchemaVersion returns version of storage schema. Returned value depends on etcd version that created the backend. For
// * v3.6 and newer will return storage version.
// * v3.5 will return it's version if it includes any storage fields added in v3.5.
// * v3.4 will return it's version if it doesn't include any storage fields added in v3.5.
// Note that data schema older than 3.4 will also return v3.4, which means the data will be read as it is and will not be modified
// by the downgrade code.
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) semver.Version {
vp := unsafeReadStorageVersion(tx)
if vp != nil {
return *vp
}
confstate := unsafeConfStateFromBackend(lg, tx)
term := unsafeReadTerm(tx)
if confstate == nil && term == 0 {
// if both confstate and term are missing, assume it's v3.4
return version.V3_4
}
return version.V3_5
}

95
mvcc/downgrade_test.go Normal file
View File

@ -0,0 +1,95 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"encoding/binary"
"encoding/json"
"testing"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/raft/raftpb"
)
func TestUnsafeDetectSchemaVersion(t *testing.T) {
tests := []struct {
storageVersion string
term uint64
confState *raftpb.ConfState
expectedVersion string
}{
{
storageVersion: "3.6.1",
expectedVersion: "3.6.1",
},
{
expectedVersion: "3.4.0",
},
{
term: 1,
expectedVersion: "3.5.0",
},
{
term: 1,
confState: &raftpb.ConfState{Voters: []uint64{1}},
expectedVersion: "3.5.0",
},
{
confState: &raftpb.ConfState{Voters: []uint64{1}},
expectedVersion: "3.5.0",
},
{
term: 0,
confState: &raftpb.ConfState{Voters: []uint64{}},
expectedVersion: "3.5.0",
},
{
term: 1,
confState: &raftpb.ConfState{Voters: []uint64{}},
expectedVersion: "3.5.0",
},
}
for _, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
if len(tt.storageVersion) > 0 {
b.tx.rangeRespc <- rangeResp{[][]byte{storageVersionKeyName}, [][]byte{[]byte(tt.storageVersion)}}
} else {
b.tx.rangeRespc <- rangeResp{nil, nil}
}
if tt.confState != nil {
confStateBytes, err := json.Marshal(tt.confState)
if err != nil {
t.Fatalf("Cannot marshal raftpb.ConfState, conf-state = %s, err = %s\n", tt.confState.String(), err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{confStateKeyName}, [][]byte{confStateBytes}}
} else {
b.tx.rangeRespc <- rangeResp{nil, nil}
}
if tt.term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, tt.term)
b.tx.rangeRespc <- rangeResp{[][]byte{termKeyName}, [][]byte{bs2}}
} else {
b.tx.rangeRespc <- rangeResp{nil, nil}
}
v := UnsafeDetectSchemaVersion(s.lg, b.BatchTx())
expectedVersion := semver.Must(semver.NewVersion(tt.expectedVersion))
if !v.Equal(*expectedVersion) {
t.Fatalf("want version: %s, got: %s", expectedVersion, v)
}
}
}

View File

@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/schedule"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/version"
"github.com/coreos/pkg/capnslog"
"go.uber.org/zap"
@ -71,7 +72,8 @@ type ConsistentIndexGetter interface {
}
type StoreConfig struct {
CompactionBatchLimit int
CompactionBatchLimit int
NextClusterVersionCompatible bool
}
type store struct {
@ -379,6 +381,19 @@ func (s *store) restore() error {
tx := s.b.BatchTx()
tx.Lock()
v := UnsafeDetectSchemaVersion(s.lg, tx)
if !v.Equal(version.V3_4) && !v.Equal(version.V3_5) {
if s.lg != nil {
s.lg.Panic("unsupported storage version",
zap.String("storage-version", v.String()))
} else {
plog.Panicf("unsupported storage version: %s\n", v.String())
}
}
if s.cfg.NextClusterVersionCompatible && v.Equal(version.V3_5) {
unsafeDowngradeMetaBucket(s.lg, tx)
}
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.revMu.Lock()

View File

@ -382,6 +382,11 @@ func TestStoreRestore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{nil, nil}
b.tx.rangeRespc <- rangeResp{nil, nil}
b.tx.rangeRespc <- rangeResp{nil, nil}
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
@ -397,6 +402,9 @@ func TestStoreRestore(t *testing.T) {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{Name: "range", Params: []interface{}{metaBucketName, storageVersionKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{metaBucketName, confStateKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{metaBucketName, termKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
@ -903,7 +911,7 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
func newFakeStore() *store {
b := &fakeBackend{&fakeBatchTx{
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
rangeRespc: make(chan rangeResp, 8)}}
fi := &fakeIndex{
Recorder: &testutil.RecorderBuffered{},
indexGetRespc: make(chan indexGetResp, 1),

View File

@ -31,6 +31,9 @@ var (
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
V3_4 = semver.Version{Major: 3, Minor: 4}
V3_5 = semver.Version{Major: 3, Minor: 5}
)
func init() {