mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server,etcdutl: Preserve etcd version in backend allowing etcdutl to read it from snapshot
This commit is contained in:
parent
3f02686619
commit
e2740b4afa
@ -66,12 +66,13 @@ func newPrinterUnsupported(n string) printer {
|
|||||||
func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }
|
func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }
|
||||||
|
|
||||||
func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) {
|
func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) {
|
||||||
hdr = []string{"hash", "revision", "total keys", "total size"}
|
hdr = []string{"hash", "revision", "total keys", "total size", "version"}
|
||||||
rows = append(rows, []string{
|
rows = append(rows, []string{
|
||||||
fmt.Sprintf("%x", ds.Hash),
|
fmt.Sprintf("%x", ds.Hash),
|
||||||
fmt.Sprint(ds.Revision),
|
fmt.Sprint(ds.Revision),
|
||||||
fmt.Sprint(ds.TotalKey),
|
fmt.Sprint(ds.TotalKey),
|
||||||
humanize.Bytes(uint64(ds.TotalSize)),
|
humanize.Bytes(uint64(ds.TotalSize)),
|
||||||
|
ds.Version,
|
||||||
})
|
})
|
||||||
return hdr, rows
|
return hdr, rows
|
||||||
}
|
}
|
||||||
|
@ -27,4 +27,5 @@ func (p *fieldsPrinter) DBStatus(r snapshot.Status) {
|
|||||||
fmt.Println(`"Revision" :`, r.Revision)
|
fmt.Println(`"Revision" :`, r.Revision)
|
||||||
fmt.Println(`"Keys" :`, r.TotalKey)
|
fmt.Println(`"Keys" :`, r.TotalKey)
|
||||||
fmt.Println(`"Size" :`, r.TotalSize)
|
fmt.Println(`"Size" :`, r.TotalSize)
|
||||||
|
fmt.Println(`"Version" :`, r.Version)
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/version"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
"go.etcd.io/etcd/server/v3/verify"
|
"go.etcd.io/etcd/server/v3/verify"
|
||||||
"go.etcd.io/etcd/server/v3/wal"
|
"go.etcd.io/etcd/server/v3/wal"
|
||||||
@ -106,6 +107,9 @@ type Status struct {
|
|||||||
Revision int64 `json:"revision"`
|
Revision int64 `json:"revision"`
|
||||||
TotalKey int `json:"totalKey"`
|
TotalKey int `json:"totalKey"`
|
||||||
TotalSize int64 `json:"totalSize"`
|
TotalSize int64 `json:"totalSize"`
|
||||||
|
// Version is equal to storageVersion of the snapshot
|
||||||
|
// Empty if server does not supports versioned snapshots (<v3.6)
|
||||||
|
Version string `json:"version"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status returns the snapshot file information.
|
// Status returns the snapshot file information.
|
||||||
@ -132,6 +136,10 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
|
|||||||
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
||||||
}
|
}
|
||||||
ds.TotalSize = tx.Size()
|
ds.TotalSize = tx.Size()
|
||||||
|
v := version.ReadStorageVersionFromSnapshot(tx)
|
||||||
|
if v != nil {
|
||||||
|
ds.Version = v.String()
|
||||||
|
}
|
||||||
c := tx.Cursor()
|
c := tx.Cursor()
|
||||||
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
||||||
b := tx.Bucket(next)
|
b := tx.Bucket(next)
|
||||||
|
@ -24,10 +24,6 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
confStateKey = []byte("confState")
|
|
||||||
)
|
|
||||||
|
|
||||||
// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
|
// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
|
||||||
// confState in backend is persisted since etcd v3.5.
|
// confState in backend is persisted since etcd v3.5.
|
||||||
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
|
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
|
||||||
@ -36,20 +32,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
|
|||||||
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
|
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes)
|
tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
|
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
|
||||||
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
|
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
|
||||||
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
|
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
|
||||||
keys, vals := tx.UnsafeRange(buckets.Meta, confStateKey, nil, 0)
|
keys, vals := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(keys) != 1 {
|
if len(keys) != 1 {
|
||||||
lg.Panic(
|
lg.Panic(
|
||||||
"unexpected number of key: "+string(confStateKey)+" when getting cluster version from backend",
|
"unexpected number of key: "+string(buckets.MetaConfStateName)+" when getting cluster version from backend",
|
||||||
zap.Int("number-of-key", len(keys)),
|
zap.Int("number-of-key", len(keys)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"go.etcd.io/etcd/raft/v3"
|
"go.etcd.io/etcd/raft/v3"
|
||||||
"go.etcd.io/etcd/server/v3/auth"
|
"go.etcd.io/etcd/server/v3/auth"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||||
|
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc"
|
"go.etcd.io/etcd/server/v3/mvcc"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
|
|
||||||
@ -100,6 +101,11 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
|
|||||||
const snapshotSendBufferSize = 32 * 1024
|
const snapshotSendBufferSize = 32 * 1024
|
||||||
|
|
||||||
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
|
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
|
||||||
|
ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx())
|
||||||
|
storageVersion := ""
|
||||||
|
if ver != nil {
|
||||||
|
storageVersion = ver.String()
|
||||||
|
}
|
||||||
snap := ms.bg.Backend().Snapshot()
|
snap := ms.bg.Backend().Snapshot()
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
@ -125,7 +131,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
|||||||
ms.lg.Info("sending database snapshot to client",
|
ms.lg.Info("sending database snapshot to client",
|
||||||
zap.Int64("total-bytes", total),
|
zap.Int64("total-bytes", total),
|
||||||
zap.String("size", size),
|
zap.String("size", size),
|
||||||
zap.String("etcd-version", version.Version),
|
zap.String("storage-version", storageVersion),
|
||||||
)
|
)
|
||||||
for total-sent > 0 {
|
for total-sent > 0 {
|
||||||
// buffer just holds read bytes from stream
|
// buffer just holds read bytes from stream
|
||||||
@ -152,7 +158,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
|||||||
resp := &pb.SnapshotResponse{
|
resp := &pb.SnapshotResponse{
|
||||||
RemainingBytes: uint64(total - sent),
|
RemainingBytes: uint64(total - sent),
|
||||||
Blob: buf[:n],
|
Blob: buf[:n],
|
||||||
Version: version.Version,
|
Version: storageVersion,
|
||||||
}
|
}
|
||||||
if err = srv.Send(resp); err != nil {
|
if err = srv.Send(resp); err != nil {
|
||||||
return togRPCError(err)
|
return togRPCError(err)
|
||||||
@ -168,7 +174,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
|||||||
zap.Int64("total-bytes", total),
|
zap.Int64("total-bytes", total),
|
||||||
zap.Int("checksum-size", len(sha)),
|
zap.Int("checksum-size", len(sha)),
|
||||||
)
|
)
|
||||||
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: version.Version}
|
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: storageVersion}
|
||||||
if err := srv.Send(hresp); err != nil {
|
if err := srv.Send(hresp); err != nil {
|
||||||
return togRPCError(err)
|
return togRPCError(err)
|
||||||
}
|
}
|
||||||
@ -177,7 +183,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
|||||||
zap.Int64("total-bytes", total),
|
zap.Int64("total-bytes", total),
|
||||||
zap.String("size", size),
|
zap.String("size", size),
|
||||||
zap.String("took", humanize.Time(start)),
|
zap.String("took", humanize.Time(start)),
|
||||||
zap.String("etcd-version", version.Version),
|
zap.String("storage-version", storageVersion),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -294,6 +294,9 @@ type EtcdServer struct {
|
|||||||
firstCommitInTermC chan struct{}
|
firstCommitInTermC chan struct{}
|
||||||
|
|
||||||
*AccessController
|
*AccessController
|
||||||
|
|
||||||
|
// Ensure that storage version is updated only once.
|
||||||
|
storageVersionUpdated sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
type backendHooks struct {
|
type backendHooks struct {
|
||||||
@ -2371,6 +2374,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
"saved snapshot",
|
"saved snapshot",
|
||||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||||
)
|
)
|
||||||
|
s.storageVersionUpdated.Do(func() {
|
||||||
|
err := serverversion.UpdateStorageVersion(s.lg, s.be.BatchTx())
|
||||||
|
if err != nil {
|
||||||
|
s.lg.Warn("failed to update storage version", zap.Error(err))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// When sending a snapshot, etcd will pause compaction.
|
// When sending a snapshot, etcd will pause compaction.
|
||||||
// After receives a snapshot, the slow follower needs to get all the entries right after
|
// After receives a snapshot, the slow follower needs to get all the entries right after
|
||||||
|
16
server/etcdserver/version/doc.go
Normal file
16
server/etcdserver/version/doc.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Package version provides functions for getting/saving storage version.
|
||||||
|
package version
|
108
server/etcdserver/version/version.go
Normal file
108
server/etcdserver/version/version.go
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package version
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
V3_5 = semver.Version{Major: 3, Minor: 5}
|
||||||
|
V3_6 = semver.Version{Major: 3, Minor: 6}
|
||||||
|
)
|
||||||
|
|
||||||
|
// UpdateStorageVersion updates storage version.
|
||||||
|
func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
|
v, err := detectStorageVersion(lg, tx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot determine storage version: %w", err)
|
||||||
|
}
|
||||||
|
switch *v {
|
||||||
|
case V3_5:
|
||||||
|
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
|
||||||
|
// All meta keys introduced in v3.6 should be filled in here.
|
||||||
|
unsafeSetStorageVersion(tx, &V3_6)
|
||||||
|
case V3_6:
|
||||||
|
default:
|
||||||
|
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
|
||||||
|
v := unsafeReadStorageVersion(tx)
|
||||||
|
if v != nil {
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
_, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
|
||||||
|
if len(cfs) == 0 {
|
||||||
|
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
|
||||||
|
}
|
||||||
|
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
|
||||||
|
if len(ts) == 0 {
|
||||||
|
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
|
||||||
|
}
|
||||||
|
copied := V3_5
|
||||||
|
return &copied, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadStorageVersion loads storage version from given backend transaction.
|
||||||
|
// Populated since v3.6
|
||||||
|
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
|
return unsafeReadStorageVersion(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsafeReadStorageVersion loads storage version from given backend transaction.
|
||||||
|
// Populated since v3.6
|
||||||
|
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||||
|
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1)
|
||||||
|
if len(vs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
v, err := semver.NewVersion(string(vs[0]))
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
|
||||||
|
// Populated since v3.6
|
||||||
|
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
|
||||||
|
v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName)
|
||||||
|
version, err := semver.NewVersion(string(v))
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return version
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsafeSetStorageVersion updates etcd storage version in backend.
|
||||||
|
// Populated since v3.6
|
||||||
|
func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
|
||||||
|
sv := semver.Version{Major: v.Major, Minor: v.Minor}
|
||||||
|
tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String()))
|
||||||
|
}
|
227
server/etcdserver/version/version_test.go
Normal file
227
server/etcdserver/version/version_test.go
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package version
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
bolt "go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/api/v3/version"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
|
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUpdateStorageVersion(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
version string
|
||||||
|
metaKeys [][]byte
|
||||||
|
expectVersion *semver.Version
|
||||||
|
expectError bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedMetaKeys [][]byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: `Backend before 3.6 without "confState" should be rejected`,
|
||||||
|
version: "",
|
||||||
|
expectVersion: nil,
|
||||||
|
expectError: true,
|
||||||
|
expectedErrorMsg: `cannot determine storage version: missing "confState" key`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: `Backend before 3.6 without "term" should be rejected`,
|
||||||
|
version: "",
|
||||||
|
metaKeys: [][]byte{buckets.MetaConfStateName},
|
||||||
|
expectVersion: nil,
|
||||||
|
expectError: true,
|
||||||
|
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
|
||||||
|
expectedMetaKeys: [][]byte{buckets.MetaConfStateName},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
|
||||||
|
version: "",
|
||||||
|
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
|
||||||
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
|
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Backend in 3.6.0 should be skipped",
|
||||||
|
version: "3.6.0",
|
||||||
|
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||||
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
|
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Backend with current version should be skipped",
|
||||||
|
version: version.Version,
|
||||||
|
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||||
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
|
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Backend in 3.7.0 should be skipped",
|
||||||
|
version: "3.7.0",
|
||||||
|
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
|
||||||
|
expectVersion: &semver.Version{Major: 3, Minor: 7},
|
||||||
|
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||||
|
tx := be.BatchTx()
|
||||||
|
if tx == nil {
|
||||||
|
t.Fatal("batch tx is nil")
|
||||||
|
}
|
||||||
|
tx.Lock()
|
||||||
|
tx.UnsafeCreateBucket(buckets.Meta)
|
||||||
|
for _, k := range tc.metaKeys {
|
||||||
|
tx.UnsafePut(buckets.Meta, k, []byte{})
|
||||||
|
}
|
||||||
|
if tc.version != "" {
|
||||||
|
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||||
|
}
|
||||||
|
tx.Unlock()
|
||||||
|
be.ForceCommit()
|
||||||
|
be.Close()
|
||||||
|
|
||||||
|
b := backend.NewDefaultBackend(tmpPath)
|
||||||
|
defer b.Close()
|
||||||
|
err := UpdateStorageVersion(zap.NewNop(), b.BatchTx())
|
||||||
|
if (err != nil) != tc.expectError {
|
||||||
|
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
|
||||||
|
}
|
||||||
|
if err != nil && err.Error() != tc.expectedErrorMsg {
|
||||||
|
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
|
||||||
|
}
|
||||||
|
v := unsafeReadStorageVersion(b.BatchTx())
|
||||||
|
assert.Equal(t, tc.expectVersion, v)
|
||||||
|
keys, _ := b.BatchTx().UnsafeRange(buckets.Meta, []byte("a"), []byte("z"), 0)
|
||||||
|
assert.ElementsMatch(t, tc.expectedMetaKeys, keys)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestVersion ensures that unsafeSetStorageVersion/unsafeReadStorageVersion work well together.
|
||||||
|
func TestVersion(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
version string
|
||||||
|
expectVersion string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
version: "3.5.0",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-alpha",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-beta.0",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-rc.1",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.1",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.version, func(t *testing.T) {
|
||||||
|
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||||
|
tx := be.BatchTx()
|
||||||
|
if tx == nil {
|
||||||
|
t.Fatal("batch tx is nil")
|
||||||
|
}
|
||||||
|
tx.Lock()
|
||||||
|
tx.UnsafeCreateBucket(buckets.Meta)
|
||||||
|
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||||
|
tx.Unlock()
|
||||||
|
be.ForceCommit()
|
||||||
|
be.Close()
|
||||||
|
|
||||||
|
b := backend.NewDefaultBackend(tmpPath)
|
||||||
|
defer b.Close()
|
||||||
|
v := unsafeReadStorageVersion(b.BatchTx())
|
||||||
|
|
||||||
|
assert.Equal(t, tc.expectVersion, v.String())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestVersionSnapshot ensures that unsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
|
||||||
|
func TestVersionSnapshot(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
version string
|
||||||
|
expectVersion string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
version: "3.5.0",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-alpha",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-beta.0",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
version: "3.5.0-rc.1",
|
||||||
|
expectVersion: "3.5.0",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.version, func(t *testing.T) {
|
||||||
|
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||||
|
tx := be.BatchTx()
|
||||||
|
if tx == nil {
|
||||||
|
t.Fatal("batch tx is nil")
|
||||||
|
}
|
||||||
|
tx.Lock()
|
||||||
|
tx.UnsafeCreateBucket(buckets.Meta)
|
||||||
|
unsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||||
|
tx.Unlock()
|
||||||
|
be.ForceCommit()
|
||||||
|
be.Close()
|
||||||
|
db, err := bolt.Open(tmpPath, 0400, &bolt.Options{ReadOnly: true})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
var ver *semver.Version
|
||||||
|
if err = db.View(func(tx *bolt.Tx) error {
|
||||||
|
ver = ReadStorageVersionFromSnapshot(tx)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, tc.expectVersion, ver.String())
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -67,8 +67,14 @@ func (b bucket) String() string { return string(b.Name()) }
|
|||||||
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
|
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// Since v3.0
|
||||||
MetaConsistentIndexKeyName = []byte("consistent_index")
|
MetaConsistentIndexKeyName = []byte("consistent_index")
|
||||||
|
// Since v3.5
|
||||||
MetaTermKeyName = []byte("term")
|
MetaTermKeyName = []byte("term")
|
||||||
|
MetaConfStateName = []byte("confState")
|
||||||
|
// Since v3.6
|
||||||
|
MetaStorageVersionName = []byte("storageVersion")
|
||||||
|
// Before adding new meta key please update server/etcdserver/version
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultIgnores defines buckets & keys to ignore in hash checking.
|
// DefaultIgnores defines buckets & keys to ignore in hash checking.
|
||||||
|
@ -204,7 +204,6 @@ func testIssue6361(t *testing.T, etcdutl bool) {
|
|||||||
t.Log("etcdctl saving snapshot...")
|
t.Log("etcdctl saving snapshot...")
|
||||||
if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
|
if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
|
||||||
fmt.Sprintf("Snapshot saved at %s", fpath),
|
fmt.Sprintf("Snapshot saved at %s", fpath),
|
||||||
"Server version 3.6.0",
|
|
||||||
); err != nil {
|
); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -288,3 +287,31 @@ func testIssue6361(t *testing.T, etcdutl bool) {
|
|||||||
}
|
}
|
||||||
t.Log("Test logic done")
|
t.Log("Test logic done")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For storageVersion to be stored, all fields expected 3.6 fields need to be set. This happens after first WAL snapshot.
|
||||||
|
// In this test we lower snapshotCount to 1 to ensure WAL snapshot is triggered.
|
||||||
|
func TestCtlV3SnapshotVersion(t *testing.T) {
|
||||||
|
testCtl(t, snapshotVersionTest, withCfg(etcdProcessClusterConfig{snapshotCount: 1}))
|
||||||
|
}
|
||||||
|
func TestCtlV3SnapshotVersionEtcdutl(t *testing.T) {
|
||||||
|
testCtl(t, snapshotVersionTest, withEtcdutl(), withCfg(etcdProcessClusterConfig{snapshotCount: 1}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func snapshotVersionTest(cx ctlCtx) {
|
||||||
|
maintenanceInitKeys(cx)
|
||||||
|
|
||||||
|
fpath := filepath.Join(cx.t.TempDir(), "snapshot")
|
||||||
|
defer os.RemoveAll(fpath)
|
||||||
|
|
||||||
|
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
|
||||||
|
cx.t.Fatalf("snapshotVersionTest ctlV3SnapshotSave error (%v)", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
st, err := getSnapshotStatus(cx, fpath)
|
||||||
|
if err != nil {
|
||||||
|
cx.t.Fatalf("snapshotVersionTest getSnapshotStatus error (%v)", err)
|
||||||
|
}
|
||||||
|
if st.Version != "3.6.0" {
|
||||||
|
cx.t.Fatalf("expected %q, got %q", "3.6.0", st.Version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -249,7 +249,8 @@ func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Co
|
|||||||
func TestMaintenanceSnapshotWithVersionVersion(t *testing.T) {
|
func TestMaintenanceSnapshotWithVersionVersion(t *testing.T) {
|
||||||
integration.BeforeTest(t)
|
integration.BeforeTest(t)
|
||||||
|
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
// Set SnapshotCount to 1 to force raft snapshot to ensure that storage version is set
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, SnapshotCount: 1})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
// reading snapshot with canceled context should error out
|
// reading snapshot with canceled context should error out
|
||||||
@ -258,7 +259,7 @@ func TestMaintenanceSnapshotWithVersionVersion(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer resp.Snapshot.Close()
|
defer resp.Snapshot.Close()
|
||||||
if resp.Version != version.Version {
|
if resp.Version != "3.6.0" {
|
||||||
t.Errorf("unexpected version, expected %q, got %q", version.Version, resp.Version)
|
t.Errorf("unexpected version, expected %q, got %q", version.Version, resp.Version)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/version"
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
@ -39,7 +38,7 @@ import (
|
|||||||
func TestSaveSnapshotFilePermissions(t *testing.T) {
|
func TestSaveSnapshotFilePermissions(t *testing.T) {
|
||||||
expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
|
expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
|
||||||
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
||||||
_, dbPath := createSnapshotFile(t, kvs)
|
_, dbPath := createSnapshotFile(t, newEmbedConfig(t), kvs)
|
||||||
defer os.RemoveAll(dbPath)
|
defer os.RemoveAll(dbPath)
|
||||||
|
|
||||||
dbInfo, err := os.Stat(dbPath)
|
dbInfo, err := os.Stat(dbPath)
|
||||||
@ -53,14 +52,17 @@ func TestSaveSnapshotFilePermissions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSaveSnapshotVersion ensures that the snapshot returns proper etcd version.
|
// TestSaveSnapshotVersion ensures that the snapshot returns proper storage version.
|
||||||
func TestSaveSnapshotVersion(t *testing.T) {
|
func TestSaveSnapshotVersion(t *testing.T) {
|
||||||
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
||||||
ver, dbPath := createSnapshotFile(t, kvs)
|
cfg := newEmbedConfig(t)
|
||||||
|
// Force raft snapshot to ensure that storage version is set
|
||||||
|
cfg.SnapshotCount = 1
|
||||||
|
ver, dbPath := createSnapshotFile(t, cfg, kvs)
|
||||||
defer os.RemoveAll(dbPath)
|
defer os.RemoveAll(dbPath)
|
||||||
|
|
||||||
if ver != version.Version {
|
if ver != "3.6.0" {
|
||||||
t.Fatalf("expected snapshot version %s, got %s:", version.Version, ver)
|
t.Fatalf("expected snapshot version %s, got %s:", "3.6.0", ver)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,19 +70,23 @@ type kv struct {
|
|||||||
k, v string
|
k, v string
|
||||||
}
|
}
|
||||||
|
|
||||||
// creates a snapshot file and returns the file path.
|
func newEmbedConfig(t *testing.T) *embed.Config {
|
||||||
func createSnapshotFile(t *testing.T, kvs []kv) (version string, dbPath string) {
|
|
||||||
testutil.SkipTestIfShortMode(t,
|
|
||||||
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
|
||||||
clusterN := 1
|
clusterN := 1
|
||||||
urls := newEmbedURLs(clusterN * 2)
|
urls := newEmbedURLs(clusterN * 2)
|
||||||
cURLs, pURLs := urls[:clusterN], urls[clusterN:]
|
cURLs, pURLs := urls[:clusterN], urls[clusterN:]
|
||||||
|
|
||||||
cfg := integration.NewEmbedConfig(t, "default")
|
cfg := integration.NewEmbedConfig(t, "default")
|
||||||
cfg.ClusterState = "new"
|
cfg.ClusterState = "new"
|
||||||
cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
|
cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
|
||||||
cfg.LPUrls, cfg.APUrls = pURLs, pURLs
|
cfg.LPUrls, cfg.APUrls = pURLs, pURLs
|
||||||
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
|
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// creates a snapshot file and returns the file path.
|
||||||
|
func createSnapshotFile(t *testing.T, cfg *embed.Config, kvs []kv) (version string, dbPath string) {
|
||||||
|
testutil.SkipTestIfShortMode(t,
|
||||||
|
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||||
|
|
||||||
srv, err := embed.StartEtcd(cfg)
|
srv, err := embed.StartEtcd(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user