mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13094 from serathius/etcd-version
[Version in Snapshot] Preserve etcd version in backend allowing etcdutl to read it from snapshot
This commit is contained in:
commit
dcf60888bc
@ -66,12 +66,13 @@ func newPrinterUnsupported(n string) printer {
|
||||
func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }
|
||||
|
||||
func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) {
|
||||
hdr = []string{"hash", "revision", "total keys", "total size"}
|
||||
hdr = []string{"hash", "revision", "total keys", "total size", "version"}
|
||||
rows = append(rows, []string{
|
||||
fmt.Sprintf("%x", ds.Hash),
|
||||
fmt.Sprint(ds.Revision),
|
||||
fmt.Sprint(ds.TotalKey),
|
||||
humanize.Bytes(uint64(ds.TotalSize)),
|
||||
ds.Version,
|
||||
})
|
||||
return hdr, rows
|
||||
}
|
||||
|
@ -27,4 +27,5 @@ func (p *fieldsPrinter) DBStatus(r snapshot.Status) {
|
||||
fmt.Println(`"Revision" :`, r.Revision)
|
||||
fmt.Println(`"Keys" :`, r.TotalKey)
|
||||
fmt.Println(`"Size" :`, r.TotalSize)
|
||||
fmt.Println(`"Version" :`, r.Version)
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/version"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
@ -106,6 +107,9 @@ type Status struct {
|
||||
Revision int64 `json:"revision"`
|
||||
TotalKey int `json:"totalKey"`
|
||||
TotalSize int64 `json:"totalSize"`
|
||||
// Version is equal to storageVersion of the snapshot
|
||||
// Empty if server does not supports versioned snapshots (<v3.6)
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
// Status returns the snapshot file information.
|
||||
@ -132,6 +136,10 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
|
||||
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
||||
}
|
||||
ds.TotalSize = tx.Size()
|
||||
v := version.ReadStorageVersionFromSnapshot(tx)
|
||||
if v != nil {
|
||||
ds.Version = v.String()
|
||||
}
|
||||
c := tx.Cursor()
|
||||
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
||||
b := tx.Bucket(next)
|
||||
|
@ -24,10 +24,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
confStateKey = []byte("confState")
|
||||
)
|
||||
|
||||
// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
|
||||
// confState in backend is persisted since etcd v3.5.
|
||||
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
|
||||
@ -36,20 +32,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
|
||||
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
|
||||
}
|
||||
|
||||
tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes)
|
||||
tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes)
|
||||
}
|
||||
|
||||
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
|
||||
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
|
||||
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
|
||||
keys, vals := tx.UnsafeRange(buckets.Meta, confStateKey, nil, 0)
|
||||
keys, vals := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(keys) != 1 {
|
||||
lg.Panic(
|
||||
"unexpected number of key: "+string(confStateKey)+" when getting cluster version from backend",
|
||||
"unexpected number of key: "+string(buckets.MetaConfStateName)+" when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"go.etcd.io/etcd/raft/v3"
|
||||
"go.etcd.io/etcd/server/v3/auth"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
|
||||
@ -100,6 +101,11 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
|
||||
const snapshotSendBufferSize = 32 * 1024
|
||||
|
||||
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
|
||||
ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx())
|
||||
storageVersion := ""
|
||||
if ver != nil {
|
||||
storageVersion = ver.String()
|
||||
}
|
||||
snap := ms.bg.Backend().Snapshot()
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
@ -125,7 +131,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
||||
ms.lg.Info("sending database snapshot to client",
|
||||
zap.Int64("total-bytes", total),
|
||||
zap.String("size", size),
|
||||
zap.String("etcd-version", version.Version),
|
||||
zap.String("storage-version", storageVersion),
|
||||
)
|
||||
for total-sent > 0 {
|
||||
// buffer just holds read bytes from stream
|
||||
@ -152,7 +158,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
||||
resp := &pb.SnapshotResponse{
|
||||
RemainingBytes: uint64(total - sent),
|
||||
Blob: buf[:n],
|
||||
Version: version.Version,
|
||||
Version: storageVersion,
|
||||
}
|
||||
if err = srv.Send(resp); err != nil {
|
||||
return togRPCError(err)
|
||||
@ -168,7 +174,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
||||
zap.Int64("total-bytes", total),
|
||||
zap.Int("checksum-size", len(sha)),
|
||||
)
|
||||
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: version.Version}
|
||||
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: storageVersion}
|
||||
if err := srv.Send(hresp); err != nil {
|
||||
return togRPCError(err)
|
||||
}
|
||||
@ -177,7 +183,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
||||
zap.Int64("total-bytes", total),
|
||||
zap.String("size", size),
|
||||
zap.String("took", humanize.Time(start)),
|
||||
zap.String("etcd-version", version.Version),
|
||||
zap.String("storage-version", storageVersion),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
@ -294,6 +294,9 @@ type EtcdServer struct {
|
||||
firstCommitInTermC chan struct{}
|
||||
|
||||
*AccessController
|
||||
|
||||
// Ensure that storage version is updated only once.
|
||||
storageVersionUpdated sync.Once
|
||||
}
|
||||
|
||||
type backendHooks struct {
|
||||
@ -2371,6 +2374,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
"saved snapshot",
|
||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||
)
|
||||
s.storageVersionUpdated.Do(func() {
|
||||
err := serverversion.UpdateStorageVersion(s.lg, s.be.BatchTx())
|
||||
if err != nil {
|
||||
s.lg.Warn("failed to update storage version", zap.Error(err))
|
||||
}
|
||||
})
|
||||
|
||||
// When sending a snapshot, etcd will pause compaction.
|
||||
// After receives a snapshot, the slow follower needs to get all the entries right after
|
||||
|
16
server/etcdserver/version/doc.go
Normal file
16
server/etcdserver/version/doc.go
Normal file
@ -0,0 +1,16 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package version provides functions for getting/saving storage version.
|
||||
package version
|
108
server/etcdserver/version/version.go
Normal file
108
server/etcdserver/version/version.go
Normal file
@ -0,0 +1,108 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package version
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
)
|
||||
|
||||
var (
|
||||
V3_5 = semver.Version{Major: 3, Minor: 5}
|
||||
V3_6 = semver.Version{Major: 3, Minor: 6}
|
||||
)
|
||||
|
||||
// UpdateStorageVersion updates storage version.
|
||||
func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
v, err := detectStorageVersion(lg, tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine storage version: %w", err)
|
||||
}
|
||||
switch *v {
|
||||
case V3_5:
|
||||
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
|
||||
// All meta keys introduced in v3.6 should be filled in here.
|
||||
unsafeSetStorageVersion(tx, &V3_6)
|
||||
case V3_6:
|
||||
default:
|
||||
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
|
||||
v := unsafeReadStorageVersion(tx)
|
||||
if v != nil {
|
||||
return v, nil
|
||||
}
|
||||
_, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
|
||||
if len(cfs) == 0 {
|
||||
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
|
||||
}
|
||||
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
|
||||
if len(ts) == 0 {
|
||||
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
|
||||
}
|
||||
copied := V3_5
|
||||
return &copied, nil
|
||||
}
|
||||
|
||||
// ReadStorageVersion loads storage version from given backend transaction.
|
||||
// Populated since v3.6
|
||||
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return unsafeReadStorageVersion(tx)
|
||||
}
|
||||
|
||||
// unsafeReadStorageVersion loads storage version from given backend transaction.
|
||||
// Populated since v3.6
|
||||
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
v, err := semver.NewVersion(string(vs[0]))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
|
||||
// Populated since v3.6
|
||||
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
|
||||
v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName)
|
||||
version, err := semver.NewVersion(string(v))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return version
|
||||
}
|
||||
|
||||
// unsafeSetStorageVersion updates etcd storage version in backend.
|
||||
// Populated since v3.6
|
||||
func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
|
||||
sv := semver.Version{Major: v.Major, Minor: v.Minor}
|
||||
tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String()))
|
||||
}
|
227
server/etcdserver/version/version_test.go
Normal file
227
server/etcdserver/version/version_test.go
Normal file
@ -0,0 +1,227 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package version
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/stretchr/testify/assert"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
)
|
||||
|
||||
func TestUpdateStorageVersion(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
version string
|
||||
metaKeys [][]byte
|
||||
expectVersion *semver.Version
|
||||
expectError bool
|
||||
expectedErrorMsg string
|
||||
expectedMetaKeys [][]byte
|
||||
}{
|
||||
{
|
||||
name: `Backend before 3.6 without "confState" should be rejected`,
|
||||
version: "",
|
||||
expectVersion: nil,
|
||||
expectError: true,
|
||||
expectedErrorMsg: `cannot determine storage version: missing "confState" key`,
|
||||
},
|
||||
{
|
||||
name: `Backend before 3.6 without "term" should be rejected`,
|
||||
version: "",
|
||||
metaKeys: [][]byte{buckets.MetaConfStateName},
|
||||
expectVersion: nil,
|
||||
expectError: true,
|
||||
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
|
||||
expectedMetaKeys: [][]byte{buckets.MetaConfStateName},
|
||||
},
|
||||
{
|
||||
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
|
||||
version: "",
|
||||
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
|
||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||
},
|
||||
{
|
||||
name: "Backend in 3.6.0 should be skipped",
|
||||
version: "3.6.0",
|
||||
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||
},
|
||||
{
|
||||
name: "Backend with current version should be skipped",
|
||||
version: version.Version,
|
||||
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||
},
|
||||
{
|
||||
name: "Backend in 3.7.0 should be skipped",
|
||||
version: "3.7.0",
|
||||
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
|
||||
expectVersion: &semver.Version{Major: 3, Minor: 7},
|
||||
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
if tx == nil {
|
||||
t.Fatal("batch tx is nil")
|
||||
}
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
for _, k := range tc.metaKeys {
|
||||
tx.UnsafePut(buckets.Meta, k, []byte{})
|
||||
}
|
||||
if tc.version != "" {
|
||||
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||
}
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
b := backend.NewDefaultBackend(tmpPath)
|
||||
defer b.Close()
|
||||
err := UpdateStorageVersion(zap.NewNop(), b.BatchTx())
|
||||
if (err != nil) != tc.expectError {
|
||||
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
|
||||
}
|
||||
if err != nil && err.Error() != tc.expectedErrorMsg {
|
||||
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
|
||||
}
|
||||
v := unsafeReadStorageVersion(b.BatchTx())
|
||||
assert.Equal(t, tc.expectVersion, v)
|
||||
keys, _ := b.BatchTx().UnsafeRange(buckets.Meta, []byte("a"), []byte("z"), 0)
|
||||
assert.ElementsMatch(t, tc.expectedMetaKeys, keys)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestVersion ensures that unsafeSetStorageVersion/unsafeReadStorageVersion work well together.
|
||||
func TestVersion(t *testing.T) {
|
||||
tcs := []struct {
|
||||
version string
|
||||
expectVersion string
|
||||
}{
|
||||
{
|
||||
version: "3.5.0",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-alpha",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-beta.0",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-rc.1",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.1",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.version, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
if tx == nil {
|
||||
t.Fatal("batch tx is nil")
|
||||
}
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
b := backend.NewDefaultBackend(tmpPath)
|
||||
defer b.Close()
|
||||
v := unsafeReadStorageVersion(b.BatchTx())
|
||||
|
||||
assert.Equal(t, tc.expectVersion, v.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestVersionSnapshot ensures that unsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
|
||||
func TestVersionSnapshot(t *testing.T) {
|
||||
tcs := []struct {
|
||||
version string
|
||||
expectVersion string
|
||||
}{
|
||||
{
|
||||
version: "3.5.0",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-alpha",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-beta.0",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
{
|
||||
version: "3.5.0-rc.1",
|
||||
expectVersion: "3.5.0",
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.version, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
if tx == nil {
|
||||
t.Fatal("batch tx is nil")
|
||||
}
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
db, err := bolt.Open(tmpPath, 0400, &bolt.Options{ReadOnly: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
var ver *semver.Version
|
||||
if err = db.View(func(tx *bolt.Tx) error {
|
||||
ver = ReadStorageVersionFromSnapshot(tx)
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.expectVersion, ver.String())
|
||||
|
||||
})
|
||||
}
|
||||
}
|
@ -67,8 +67,14 @@ func (b bucket) String() string { return string(b.Name()) }
|
||||
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
|
||||
|
||||
var (
|
||||
// Since v3.0
|
||||
MetaConsistentIndexKeyName = []byte("consistent_index")
|
||||
MetaTermKeyName = []byte("term")
|
||||
// Since v3.5
|
||||
MetaTermKeyName = []byte("term")
|
||||
MetaConfStateName = []byte("confState")
|
||||
// Since v3.6
|
||||
MetaStorageVersionName = []byte("storageVersion")
|
||||
// Before adding new meta key please update server/etcdserver/version
|
||||
)
|
||||
|
||||
// DefaultIgnores defines buckets & keys to ignore in hash checking.
|
||||
|
@ -204,7 +204,6 @@ func testIssue6361(t *testing.T, etcdutl bool) {
|
||||
t.Log("etcdctl saving snapshot...")
|
||||
if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
|
||||
fmt.Sprintf("Snapshot saved at %s", fpath),
|
||||
"Server version 3.6.0",
|
||||
); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -288,3 +287,31 @@ func testIssue6361(t *testing.T, etcdutl bool) {
|
||||
}
|
||||
t.Log("Test logic done")
|
||||
}
|
||||
|
||||
// For storageVersion to be stored, all fields expected 3.6 fields need to be set. This happens after first WAL snapshot.
|
||||
// In this test we lower snapshotCount to 1 to ensure WAL snapshot is triggered.
|
||||
func TestCtlV3SnapshotVersion(t *testing.T) {
|
||||
testCtl(t, snapshotVersionTest, withCfg(etcdProcessClusterConfig{snapshotCount: 1}))
|
||||
}
|
||||
func TestCtlV3SnapshotVersionEtcdutl(t *testing.T) {
|
||||
testCtl(t, snapshotVersionTest, withEtcdutl(), withCfg(etcdProcessClusterConfig{snapshotCount: 1}))
|
||||
}
|
||||
|
||||
func snapshotVersionTest(cx ctlCtx) {
|
||||
maintenanceInitKeys(cx)
|
||||
|
||||
fpath := filepath.Join(cx.t.TempDir(), "snapshot")
|
||||
defer os.RemoveAll(fpath)
|
||||
|
||||
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
|
||||
cx.t.Fatalf("snapshotVersionTest ctlV3SnapshotSave error (%v)", err)
|
||||
}
|
||||
|
||||
st, err := getSnapshotStatus(cx, fpath)
|
||||
if err != nil {
|
||||
cx.t.Fatalf("snapshotVersionTest getSnapshotStatus error (%v)", err)
|
||||
}
|
||||
if st.Version != "3.6.0" {
|
||||
cx.t.Fatalf("expected %q, got %q", "3.6.0", st.Version)
|
||||
}
|
||||
}
|
||||
|
@ -249,7 +249,8 @@ func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Co
|
||||
func TestMaintenanceSnapshotWithVersionVersion(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
// Set SnapshotCount to 1 to force raft snapshot to ensure that storage version is set
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, SnapshotCount: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// reading snapshot with canceled context should error out
|
||||
@ -258,7 +259,7 @@ func TestMaintenanceSnapshotWithVersionVersion(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Snapshot.Close()
|
||||
if resp.Version != version.Version {
|
||||
if resp.Version != "3.6.0" {
|
||||
t.Errorf("unexpected version, expected %q, got %q", version.Version, resp.Version)
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
@ -39,7 +38,7 @@ import (
|
||||
func TestSaveSnapshotFilePermissions(t *testing.T) {
|
||||
expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
|
||||
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
||||
_, dbPath := createSnapshotFile(t, kvs)
|
||||
_, dbPath := createSnapshotFile(t, newEmbedConfig(t), kvs)
|
||||
defer os.RemoveAll(dbPath)
|
||||
|
||||
dbInfo, err := os.Stat(dbPath)
|
||||
@ -53,14 +52,17 @@ func TestSaveSnapshotFilePermissions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSaveSnapshotVersion ensures that the snapshot returns proper etcd version.
|
||||
// TestSaveSnapshotVersion ensures that the snapshot returns proper storage version.
|
||||
func TestSaveSnapshotVersion(t *testing.T) {
|
||||
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
||||
ver, dbPath := createSnapshotFile(t, kvs)
|
||||
cfg := newEmbedConfig(t)
|
||||
// Force raft snapshot to ensure that storage version is set
|
||||
cfg.SnapshotCount = 1
|
||||
ver, dbPath := createSnapshotFile(t, cfg, kvs)
|
||||
defer os.RemoveAll(dbPath)
|
||||
|
||||
if ver != version.Version {
|
||||
t.Fatalf("expected snapshot version %s, got %s:", version.Version, ver)
|
||||
if ver != "3.6.0" {
|
||||
t.Fatalf("expected snapshot version %s, got %s:", "3.6.0", ver)
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,19 +70,23 @@ type kv struct {
|
||||
k, v string
|
||||
}
|
||||
|
||||
// creates a snapshot file and returns the file path.
|
||||
func createSnapshotFile(t *testing.T, kvs []kv) (version string, dbPath string) {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||
func newEmbedConfig(t *testing.T) *embed.Config {
|
||||
clusterN := 1
|
||||
urls := newEmbedURLs(clusterN * 2)
|
||||
cURLs, pURLs := urls[:clusterN], urls[clusterN:]
|
||||
|
||||
cfg := integration.NewEmbedConfig(t, "default")
|
||||
cfg.ClusterState = "new"
|
||||
cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
|
||||
cfg.LPUrls, cfg.APUrls = pURLs, pURLs
|
||||
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
|
||||
return cfg
|
||||
}
|
||||
|
||||
// creates a snapshot file and returns the file path.
|
||||
func createSnapshotFile(t *testing.T, cfg *embed.Config, kvs []kv) (version string, dbPath string) {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||
|
||||
srv, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user