etcdserver: Move Read/Update methods on Meta bucket to one place

There are still some left like compact keys, but they will require more
work to avoid circular dependency.
This commit is contained in:
Marek Siarkowicz 2021-06-29 10:57:26 +02:00
parent 14c527f59a
commit bf3e7033e9
16 changed files with 337 additions and 272 deletions

View File

@ -31,7 +31,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
@ -326,8 +325,8 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
buckets.UnsafeCreateMetaBucket(tx)
buckets.UnsafeUpdateConsistentIndex(tx, idx, term, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -40,7 +40,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
@ -137,7 +136,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
v := version.ReadStorageVersionFromSnapshot(tx)
v := buckets.ReadStorageVersionFromSnapshot(tx)
if v != nil {
ds.Version = v.String()
}

View File

@ -27,9 +27,9 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -101,7 +101,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
const snapshotSendBufferSize = 32 * 1024
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx())
ver := buckets.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
storageVersion = ver.String()

View File

@ -22,8 +22,8 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = buckets.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil

View File

@ -15,7 +15,6 @@
package cindex
import (
"encoding/binary"
"sync"
"sync/atomic"
@ -74,7 +73,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v, term := ReadConsistentIndex(ci.be.BatchTx())
v, term := buckets.ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
@ -87,7 +86,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
buckets.UnsafeUpdateConsistentIndex(tx, index, term, true)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -117,73 +116,8 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(buckets.Meta)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Meta)
}
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2)
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
buckets.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
// TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together.
@ -36,7 +37,7 @@ func TestConsistentIndex(t *testing.T) {
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
buckets.UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := uint64(7890123)

View File

@ -386,7 +386,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
be := openBackend(cfg, beHooks)
ci.SetBackend(be)
cindex.CreateMetaBucket(be.BatchTx())
buckets.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)

View File

@ -651,7 +651,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cindex.CreateMetaBucket(be.BatchTx())
buckets.CreateMetaBucket(be.BatchTx())
ci := cindex.NewConsistentIndex(be)
srv := &EtcdServer{
@ -698,7 +698,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
rindex, rterm := buckets.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

View File

@ -18,7 +18,6 @@ import (
"fmt"
"github.com/coreos/go-semver/semver"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.etcd.io/etcd/server/v3/mvcc/backend"
@ -42,7 +41,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
unsafeSetStorageVersion(tx, &V3_6)
buckets.UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
@ -51,58 +50,18 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
}
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
v := unsafeReadStorageVersion(tx)
v := buckets.UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
}
_, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
if len(cfs) == 0 {
confstate := buckets.UnsafeConfStateFromBackend(lg, tx)
if confstate == nil {
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
}
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
_, term := buckets.UnsafeReadConsistentIndex(tx)
if term == 0 {
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
}
copied := V3_5
return &copied, nil
}
// ReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
tx.Lock()
defer tx.Unlock()
return unsafeReadStorageVersion(tx)
}
// unsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
// Populated since v3.6
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName)
version, err := semver.NewVersion(string(v))
if err != nil {
return nil
}
return version
}
// unsafeSetStorageVersion updates etcd storage version in backend.
// Populated since v3.6
func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
sv := semver.Version{Major: v.Major, Minor: v.Minor}
tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String()))
}

View File

@ -20,13 +20,12 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
func TestUpdateStorageVersion(t *testing.T) {
@ -37,7 +36,6 @@ func TestUpdateStorageVersion(t *testing.T) {
expectVersion *semver.Version
expectError bool
expectedErrorMsg string
expectedMetaKeys [][]byte
}{
{
name: `Backend before 3.6 without "confState" should be rejected`,
@ -53,51 +51,54 @@ func TestUpdateStorageVersion(t *testing.T) {
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
expectedMetaKeys: [][]byte{buckets.MetaConfStateName},
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
buckets.UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
tx.UnsafePut(buckets.Meta, k, []byte{})
switch string(k) {
case string(buckets.MetaConfStateName):
buckets.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(buckets.MetaTermKeyName):
buckets.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
default:
tx.UnsafePut(buckets.Meta, k, []byte{})
}
}
if tc.version != "" {
unsafeSetStorageVersion(tx, semver.New(tc.version))
buckets.UnsafeSetStorageVersion(tx, semver.New(tc.version))
}
tx.Unlock()
be.ForceCommit()
@ -105,123 +106,15 @@ func TestUpdateStorageVersion(t *testing.T) {
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
err := UpdateStorageVersion(zap.NewNop(), b.BatchTx())
err := UpdateStorageVersion(lg, b.BatchTx())
if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
}
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
}
v := unsafeReadStorageVersion(b.BatchTx())
v := buckets.UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
keys, _ := b.BatchTx().UnsafeRange(buckets.Meta, []byte("a"), []byte("z"), 0)
assert.ElementsMatch(t, tc.expectedMetaKeys, keys)
})
}
}
// TestVersion ensures that unsafeSetStorageVersion/unsafeReadStorageVersion work well together.
func TestVersion(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
{
version: "3.5.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
unsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := unsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v.String())
})
}
}
// TestVersionSnapshot ensures that unsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
func TestVersionSnapshot(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
unsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
db, err := bolt.Open(tmpPath, 0400, &bolt.Options{ReadOnly: true})
if err != nil {
t.Fatal(err)
}
defer db.Close()
var ver *semver.Version
if err = db.View(func(tx *bolt.Tx) error {
ver = ReadStorageVersionFromSnapshot(tx)
return nil
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, tc.expectVersion, ver.String())
})
}
}

View File

@ -0,0 +1,85 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"encoding/binary"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Meta)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Meta)
}
// UnsafeReadConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
_, ts := tx.UnsafeRange(Meta, MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return UnsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
}
}

View File

@ -29,7 +29,7 @@ func TestConfStateFromBackendInOneTx(t *testing.T) {
defer betesting.Close(t, be)
tx := be.BatchTx()
tx.UnsafeCreateBucket(Meta)
CreateMetaBucket(tx)
tx.Lock()
defer tx.Unlock()
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
@ -47,7 +47,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
{
tx := be.BatchTx()
tx.UnsafeCreateBucket(Meta)
CreateMetaBucket(tx)
tx.Commit()
}

View File

@ -0,0 +1,61 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
// ReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
tx.Lock()
defer tx.Unlock()
return UnsafeReadStorageVersion(tx)
}
// UnsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func UnsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(Meta, MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}
// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
// Populated since v3.6
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
v := tx.Bucket(Meta.Name()).Get(MetaStorageVersionName)
version, err := semver.NewVersion(string(v))
if err != nil {
return nil
}
return version
}
// UnsafeSetStorageVersion updates etcd storage version in backend.
// Populated since v3.6
func UnsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
sv := semver.Version{Major: v.Major, Minor: v.Minor}
tx.UnsafePut(Meta, MetaStorageVersionName, []byte(sv.String()))
}

View File

@ -0,0 +1,133 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
// TestVersion ensures that UnsafeSetStorageVersion/UnsafeReadStorageVersion work well together.
func TestVersion(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
{
version: "3.5.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(Meta)
UnsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v.String())
})
}
}
// TestVersionSnapshot ensures that UnsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together.
func TestVersionSnapshot(t *testing.T) {
tcs := []struct {
version string
expectVersion string
}{
{
version: "3.5.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-alpha",
expectVersion: "3.5.0",
},
{
version: "3.5.0-beta.0",
expectVersion: "3.5.0",
},
{
version: "3.5.0-rc.1",
expectVersion: "3.5.0",
},
}
for _, tc := range tcs {
t.Run(tc.version, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(Meta)
UnsafeSetStorageVersion(tx, semver.New(tc.version))
tx.Unlock()
be.ForceCommit()
be.Close()
db, err := bbolt.Open(tmpPath, 0400, &bbolt.Options{ReadOnly: true})
if err != nil {
t.Fatal(err)
}
defer db.Close()
var ver *semver.Version
if err = db.View(func(tx *bbolt.Tx) error {
ver = ReadStorageVersionFromSnapshot(tx)
return nil
}); err != nil {
t.Fatal(err)
}
assert.Equal(t, tc.expectVersion, ver.String())
})
}
}

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -83,7 +84,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
tx := be.BatchTx()
tx.Lock()
cindex.UnsafeCreateMetaBucket(tx)
buckets.UnsafeCreateMetaBucket(tx)
ci.UnsafeSave(tx)
tx.Unlock()

View File

@ -20,8 +20,8 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/datadir"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
wal2 "go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"
@ -109,7 +109,7 @@ func MustVerifyIfEnabled(cfg Config) {
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
tx := be.BatchTx()
index, term := cindex.ReadConsistentIndex(tx)
index, term := buckets.ReadConsistentIndex(tx)
if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}