Merge pull request #13155 from serathius/meta

etcdserver: Move Read/Update methods on Meta bucket to one place
This commit is contained in:
Piotr Tabor 2021-07-05 14:15:17 +02:00 committed by GitHub
commit f4fad92e0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 337 additions and 272 deletions

View File

@ -31,7 +31,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
@ -326,8 +325,8 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
buckets.UnsafeCreateMetaBucket(tx)
buckets.UnsafeUpdateConsistentIndex(tx, idx, term, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -40,7 +40,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
@ -137,7 +136,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
v := version.ReadStorageVersionFromSnapshot(tx)
v := buckets.ReadStorageVersionFromSnapshot(tx)
if v != nil {
ds.Version = v.String()
}

View File

@ -27,9 +27,9 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -101,7 +101,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
const snapshotSendBufferSize = 32 * 1024
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx())
ver := buckets.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
storageVersion = ver.String()

View File

@ -22,8 +22,8 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = buckets.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil

View File

@ -15,7 +15,6 @@
package cindex
import (
"encoding/binary"
"sync"
"sync/atomic"
@ -74,7 +73,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v, term := ReadConsistentIndex(ci.be.BatchTx())
v, term := buckets.ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
@ -87,7 +86,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
buckets.UnsafeUpdateConsistentIndex(tx, index, term, true)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -117,73 +116,8 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(buckets.Meta)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Meta)
}
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2)
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
buckets.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
// TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together.
@ -36,7 +37,7 @@ func TestConsistentIndex(t *testing.T) {
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
buckets.UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := uint64(7890123)

View File

@ -386,7 +386,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
be := openBackend(cfg, beHooks)
ci.SetBackend(be)
cindex.CreateMetaBucket(be.BatchTx())
buckets.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)

View File

@ -651,7 +651,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cindex.CreateMetaBucket(be.BatchTx())
buckets.CreateMetaBucket(be.BatchTx())
ci := cindex.NewConsistentIndex(be)
srv := &EtcdServer{
@ -698,7 +698,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
rindex, rterm := buckets.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

View File

@ -18,7 +18,6 @@ import (
"fmt"
"github.com/coreos/go-semver/semver"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.etcd.io/etcd/server/v3/mvcc/backend"
@ -42,7 +41,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
unsafeSetStorageVersion(tx, &V3_6)
buckets.UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
@ -51,58 +50,18 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
}
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
v := unsafeReadStorageVersion(tx)
v := buckets.UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
}
_, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
if len(cfs) == 0 {
confstate := buckets.UnsafeConfStateFromBackend(lg, tx)
if confstate == nil {
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
}
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
_, term := buckets.UnsafeReadConsistentIndex(tx)
if term == 0 {
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
}
copied := V3_5
return &copied, nil
}
// ReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
tx.Lock()
defer tx.Unlock()
return unsafeReadStorageVersion(tx)
}
// unsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
// Populated since v3.6
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName)
version, err := semver.NewVersion(string(v))
if err != nil {
return nil
}
return version
}
// unsafeSetStorageVersion updates etcd storage version in backend.
// Populated since v3.6
func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
sv := semver.Version{Major: v.Major, Minor: v.Minor}
tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String()))
}

View File

@ -20,13 +20,12 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
func TestUpdateStorageVersion(t *testing.T) {
@ -37,7 +36,6 @@ func TestUpdateStorageVersion(t *testing.T) {
expectVersion *semver.Version
expectError bool
expectedErrorMsg string
expectedMetaKeys [][]byte
}{
{
name: `Backend before 3.6 without "confState" should be rejected`,
@ -53,51 +51,54 @@ func TestUpdateStorageVersion(t *testing.T) {
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
expectedMetaKeys: [][]byte{buckets.MetaConfStateName},
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
buckets.UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
tx.UnsafePut(buckets.Meta, k, []byte{})
switch string(k) {
case string(buckets.MetaConfStateName):
buckets.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(buckets.MetaTermKeyName):
buckets.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
default:
tx.UnsafePut(buckets.Meta, k, []byte{})
}
}
if tc.version != "" {
unsafeSetStorageVersion(tx, semver.New(tc.version))
buckets.UnsafeSetStorageVersion(tx, semver.New(tc.version))
}
tx.Unlock()
be.ForceCommit()
@ -105,123 +106,15 @@ func TestUpdateStorageVersion(t *testing.T) {
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
err := UpdateStorageVersion(zap.NewNop(), b.BatchTx())
err := UpdateStorageVersion(lg, b.BatchTx())
if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
}
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
}
v := unsafeReadStorageVersion(b.BatchTx())
v := buckets.UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
keys, _ := b.BatchTx().UnsafeRange(buckets.Meta, []byte("a"), []byte("z"), 0)
assert.ElementsMatch(t, tc.expectedMetaKeys, keys)
})
}
}
// TestVersion ensures that unsafeSetStorageVersion/unsafeReadStorageVersion work well together.
func TestVersion(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
{
version: "3.5.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
unsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := unsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v.String())
})
}
}
// TestVersionSnapshot ensures that unsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
func TestVersionSnapshot(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
unsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
db, err := bolt.Open(tmpPath, 0400, &bolt.Options{ReadOnly: true})
if err != nil {
t.Fatal(err)
}
defer db.Close()
var ver *semver.Version
if err = db.View(func(tx *bolt.Tx) error {
ver = ReadStorageVersionFromSnapshot(tx)
return nil
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, tc.expectVersion, ver.String())
})
}
}

View File

@ -0,0 +1,85 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"encoding/binary"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Meta)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Meta)
}
// UnsafeReadConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
_, ts := tx.UnsafeRange(Meta, MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return UnsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
}
}

View File

@ -29,7 +29,7 @@ func TestConfStateFromBackendInOneTx(t *testing.T) {
defer betesting.Close(t, be)
tx := be.BatchTx()
tx.UnsafeCreateBucket(Meta)
CreateMetaBucket(tx)
tx.Lock()
defer tx.Unlock()
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
@ -47,7 +47,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
{
tx := be.BatchTx()
tx.UnsafeCreateBucket(Meta)
CreateMetaBucket(tx)
tx.Commit()
}

View File

@ -0,0 +1,61 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
// ReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
tx.Lock()
defer tx.Unlock()
return UnsafeReadStorageVersion(tx)
}
// UnsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func UnsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(Meta, MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
// Populated since v3.6
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
v := tx.Bucket(Meta.Name()).Get(MetaStorageVersionName)
version, err := semver.NewVersion(string(v))
if err != nil {
return nil
}
return version
}
// UnsafeSetStorageVersion updates etcd storage version in backend.
// Populated since v3.6
func UnsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
sv := semver.Version{Major: v.Major, Minor: v.Minor}
tx.UnsafePut(Meta, MetaStorageVersionName, []byte(sv.String()))
}

View File

@ -0,0 +1,133 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
// TestVersion ensures that UnsafeSetStorageVersion/UnsafeReadStorageVersion work well together.
func TestVersion(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
{
version: "3.5.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(Meta)
UnsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v.String())
})
}
}
// TestVersionSnapshot ensures that UnsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
func TestVersionSnapshot(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(Meta)
UnsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
db, err := bbolt.Open(tmpPath, 0400, &bbolt.Options{ReadOnly: true})
if err != nil {
t.Fatal(err)
}
defer db.Close()
var ver *semver.Version
if err = db.View(func(tx *bbolt.Tx) error {
ver = ReadStorageVersionFromSnapshot(tx)
return nil
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, tc.expectVersion, ver.String())
})
}
}

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -83,7 +84,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
tx := be.BatchTx()
tx.Lock()
cindex.UnsafeCreateMetaBucket(tx)
buckets.UnsafeCreateMetaBucket(tx)
ci.UnsafeSave(tx)
tx.Unlock()

View File

@ -20,8 +20,8 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/datadir"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
wal2 "go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"
@ -109,7 +109,7 @@ func MustVerifyIfEnabled(cfg Config) {
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
tx := be.BatchTx()
index, term := cindex.ReadConsistentIndex(tx)
index, term := buckets.ReadConsistentIndex(tx)
if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}