From bf3e7033e9300443bf78fc732f0be1ae2c553cb0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 29 Jun 2021 10:57:26 +0200 Subject: [PATCH] etcdserver: Move Read/Update methods on Meta bucket to one place There are still some left like compact keys, but they will require more work to avoid circular dependency. --- etcdutl/etcdutl/backup_command.go | 5 +- etcdutl/snapshot/v3_snapshot.go | 3 +- server/etcdserver/api/v3rpc/maintenance.go | 4 +- server/etcdserver/backend.go | 4 +- server/etcdserver/cindex/cindex.go | 72 +-------- server/etcdserver/cindex/cindex_test.go | 3 +- server/etcdserver/server.go | 2 +- server/etcdserver/server_test.go | 4 +- server/etcdserver/version/version.go | 53 +------ server/etcdserver/version/version_test.go | 169 ++++----------------- server/mvcc/buckets/cindex.go | 85 +++++++++++ server/mvcc/buckets/confstate_test.go | 4 +- server/mvcc/buckets/version.go | 61 ++++++++ server/mvcc/buckets/version_test.go | 133 ++++++++++++++++ server/mvcc/kvstore_bench_test.go | 3 +- server/verify/verify.go | 4 +- 16 files changed, 337 insertions(+), 272 deletions(-) create mode 100644 server/mvcc/buckets/cindex.go create mode 100644 server/mvcc/buckets/version.go create mode 100644 server/mvcc/buckets/version_test.go diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index b55b32b51..045d1e7a7 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -31,7 +31,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/verify" @@ -326,8 +325,8 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir tx := be.BatchTx() tx.Lock() defer tx.Unlock() - cindex.UnsafeCreateMetaBucket(tx) - cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false) + buckets.UnsafeCreateMetaBucket(tx) + buckets.UnsafeUpdateConsistentIndex(tx, idx, term, false) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 5ce78a260..07886d0f6 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -40,7 +40,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" - "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/verify" @@ -137,7 +136,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) { return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings)) } ds.TotalSize = tx.Size() - v := version.ReadStorageVersionFromSnapshot(tx) + v := buckets.ReadStorageVersionFromSnapshot(tx) if v != nil { ds.Version = v.String() } diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 01362b0e3..82085c546 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -27,9 +27,9 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver" - serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -101,7 +101,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe const snapshotSendBufferSize = 32 * 1024 func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { - ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx()) + ver := buckets.ReadStorageVersion(ms.bg.Backend().ReadTx()) storageVersion := "" if ver != nil { storageVersion = ver.String() diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 081be2b52..3e95a8ccb 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -22,8 +22,8 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = buckets.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 64b98b6ff..492178a5f 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -15,7 +15,6 @@ package cindex import ( - "encoding/binary" "sync" "sync/atomic" @@ -74,7 +73,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := ReadConsistentIndex(ci.be.BatchTx()) + v, term := buckets.ReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -87,7 +86,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) term := atomic.LoadUint64(&ci.term) - UnsafeUpdateConsistentIndex(tx, index, term, true) + buckets.UnsafeUpdateConsistentIndex(tx, index, term, true) } func (ci *consistentIndex) SetBackend(be Backend) { @@ -117,73 +116,8 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} -// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet). -func UnsafeCreateMetaBucket(tx backend.BatchTx) { - tx.UnsafeCreateBucket(buckets.Meta) -} - -// CreateMetaBucket creates the `meta` bucket (if it does not exists yet). -func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() - defer tx.Unlock() - tx.UnsafeCreateBucket(buckets.Meta) -} - -// unsafeGetConsistentIndex loads consistent index & term from given transaction. -// returns 0,0 if the data are not found. -// Term is persisted since v3.5. -func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - _, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0) - if len(vs) == 0 { - return 0, 0 - } - v := binary.BigEndian.Uint64(vs[0]) - _, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0) - if len(ts) == 0 { - return v, 0 - } - t := binary.BigEndian.Uint64(ts[0]) - return v, t -} - -// ReadConsistentIndex loads consistent index and term from given transaction. -// returns 0 if the data are not found. -func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() - return unsafeReadConsistentIndex(tx) -} - -func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - if index == 0 { - // Never save 0 as it means that we didn't loaded the real index yet. - return - } - - if onlyGrow { - oldi, oldTerm := unsafeReadConsistentIndex(tx) - if term < oldTerm { - return - } - if term == oldTerm && index <= oldi { - return - } - } - - bs1 := make([]byte, 8) - binary.BigEndian.PutUint64(bs1, index) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1) - if term > 0 { - bs2 := make([]byte, 8) - binary.BigEndian.PutUint64(bs2, term) - tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2) - } -} - func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) + buckets.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index 1e111b9e8..8a57fac7b 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) // TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together. @@ -36,7 +37,7 @@ func TestConsistentIndex(t *testing.T) { } tx.Lock() - UnsafeCreateMetaBucket(tx) + buckets.UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() r := uint64(7890123) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 9c907e143..19916f800 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -386,7 +386,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { beHooks := &backendHooks{lg: cfg.Logger, indexer: ci} be := openBackend(cfg, beHooks) ci.SetBackend(be) - cindex.CreateMetaBucket(be.BatchTx()) + buckets.CreateMetaBucket(be.BatchTx()) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { err := maybeDefragBackend(cfg, be) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 2da93b08b..e6499b5d1 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -651,7 +651,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) - cindex.CreateMetaBucket(be.BatchTx()) + buckets.CreateMetaBucket(be.BatchTx()) ci := cindex.NewConsistentIndex(be) srv := &EtcdServer{ @@ -698,7 +698,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + rindex, rterm := buckets.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) assert.Equal(t, uint64(4), rterm) } diff --git a/server/etcdserver/version/version.go b/server/etcdserver/version/version.go index dd3fbae44..4caa94b59 100644 --- a/server/etcdserver/version/version.go +++ b/server/etcdserver/version/version.go @@ -18,7 +18,6 @@ import ( "fmt" "github.com/coreos/go-semver/semver" - "go.etcd.io/bbolt" "go.uber.org/zap" "go.etcd.io/etcd/server/v3/mvcc/backend" @@ -42,7 +41,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error { case V3_5: lg.Warn("setting storage version", zap.String("storage-version", V3_6.String())) // All meta keys introduced in v3.6 should be filled in here. - unsafeSetStorageVersion(tx, &V3_6) + buckets.UnsafeSetStorageVersion(tx, &V3_6) case V3_6: default: lg.Warn("unknown storage version", zap.String("storage-version", v.String())) @@ -51,58 +50,18 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error { } func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) { - v := unsafeReadStorageVersion(tx) + v := buckets.UnsafeReadStorageVersion(tx) if v != nil { return v, nil } - _, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0) - if len(cfs) == 0 { + confstate := buckets.UnsafeConfStateFromBackend(lg, tx) + if confstate == nil { return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName) } - _, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0) - if len(ts) == 0 { + _, term := buckets.UnsafeReadConsistentIndex(tx) + if term == 0 { return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName) } copied := V3_5 return &copied, nil } - -// ReadStorageVersion loads storage version from given backend transaction. -// Populated since v3.6 -func ReadStorageVersion(tx backend.ReadTx) *semver.Version { - tx.Lock() - defer tx.Unlock() - return unsafeReadStorageVersion(tx) -} - -// unsafeReadStorageVersion loads storage version from given backend transaction. -// Populated since v3.6 -func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version { - _, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1) - if len(vs) == 0 { - return nil - } - v, err := semver.NewVersion(string(vs[0])) - if err != nil { - return nil - } - return v -} - -// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction. -// Populated since v3.6 -func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version { - v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName) - version, err := semver.NewVersion(string(v)) - if err != nil { - return nil - } - return version -} - -// unsafeSetStorageVersion updates etcd storage version in backend. -// Populated since v3.6 -func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) { - sv := semver.Version{Major: v.Major, Minor: v.Minor} - tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String())) -} diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index c813a108c..405aa2b56 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -20,13 +20,12 @@ import ( "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" - bolt "go.etcd.io/bbolt" - "go.uber.org/zap" - "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" "go.etcd.io/etcd/server/v3/mvcc/buckets" + "go.uber.org/zap" ) func TestUpdateStorageVersion(t *testing.T) { @@ -37,7 +36,6 @@ func TestUpdateStorageVersion(t *testing.T) { expectVersion *semver.Version expectError bool expectedErrorMsg string - expectedMetaKeys [][]byte }{ { name: `Backend before 3.6 without "confState" should be rejected`, @@ -53,51 +51,54 @@ func TestUpdateStorageVersion(t *testing.T) { expectVersion: nil, expectError: true, expectedErrorMsg: `cannot determine storage version: missing "term" key`, - expectedMetaKeys: [][]byte{buckets.MetaConfStateName}, }, { - name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", - version: "", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, - expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", + version: "", + metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName}, + expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { - name: "Backend in 3.6.0 should be skipped", - version: "3.6.0", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, - expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + name: "Backend in 3.6.0 should be skipped", + version: "3.6.0", + metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { - name: "Backend with current version should be skipped", - version: version.Version, - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, - expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + name: "Backend with current version should be skipped", + version: version.Version, + metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { - name: "Backend in 3.7.0 should be skipped", - version: "3.7.0", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")}, - expectVersion: &semver.Version{Major: 3, Minor: 7}, - expectedMetaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")}, + name: "Backend in 3.7.0 should be skipped", + version: "3.7.0", + metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")}, + expectVersion: &semver.Version{Major: 3, Minor: 7}, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) tx := be.BatchTx() if tx == nil { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(buckets.Meta) + buckets.UnsafeCreateMetaBucket(tx) for _, k := range tc.metaKeys { - tx.UnsafePut(buckets.Meta, k, []byte{}) + switch string(k) { + case string(buckets.MetaConfStateName): + buckets.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{}) + case string(buckets.MetaTermKeyName): + buckets.UnsafeUpdateConsistentIndex(tx, 1, 1, false) + default: + tx.UnsafePut(buckets.Meta, k, []byte{}) + } } if tc.version != "" { - unsafeSetStorageVersion(tx, semver.New(tc.version)) + buckets.UnsafeSetStorageVersion(tx, semver.New(tc.version)) } tx.Unlock() be.ForceCommit() @@ -105,123 +106,15 @@ func TestUpdateStorageVersion(t *testing.T) { b := backend.NewDefaultBackend(tmpPath) defer b.Close() - err := UpdateStorageVersion(zap.NewNop(), b.BatchTx()) + err := UpdateStorageVersion(lg, b.BatchTx()) if (err != nil) != tc.expectError { t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError) } if err != nil && err.Error() != tc.expectedErrorMsg { t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg) } - v := unsafeReadStorageVersion(b.BatchTx()) + v := buckets.UnsafeReadStorageVersion(b.BatchTx()) assert.Equal(t, tc.expectVersion, v) - keys, _ := b.BatchTx().UnsafeRange(buckets.Meta, []byte("a"), []byte("z"), 0) - assert.ElementsMatch(t, tc.expectedMetaKeys, keys) - }) - } -} - -// TestVersion ensures that unsafeSetStorageVersion/unsafeReadStorageVersion work well together. -func TestVersion(t *testing.T) { - tcs := []struct { - version string - expectVersion string - }{ - { - version: "3.5.0", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-alpha", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-beta.0", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-rc.1", - expectVersion: "3.5.0", - }, - { - version: "3.5.1", - expectVersion: "3.5.0", - }, - } - for _, tc := range tcs { - t.Run(tc.version, func(t *testing.T) { - be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } - tx.Lock() - tx.UnsafeCreateBucket(buckets.Meta) - unsafeSetStorageVersion(tx, semver.New(tc.version)) - tx.Unlock() - be.ForceCommit() - be.Close() - - b := backend.NewDefaultBackend(tmpPath) - defer b.Close() - v := unsafeReadStorageVersion(b.BatchTx()) - - assert.Equal(t, tc.expectVersion, v.String()) - }) - } -} - -// TestVersionSnapshot ensures that unsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together. -func TestVersionSnapshot(t *testing.T) { - tcs := []struct { - version string - expectVersion string - }{ - { - version: "3.5.0", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-alpha", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-beta.0", - expectVersion: "3.5.0", - }, - { - version: "3.5.0-rc.1", - expectVersion: "3.5.0", - }, - } - for _, tc := range tcs { - t.Run(tc.version, func(t *testing.T) { - be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } - tx.Lock() - tx.UnsafeCreateBucket(buckets.Meta) - unsafeSetStorageVersion(tx, semver.New(tc.version)) - tx.Unlock() - be.ForceCommit() - be.Close() - db, err := bolt.Open(tmpPath, 0400, &bolt.Options{ReadOnly: true}) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - var ver *semver.Version - if err = db.View(func(tx *bolt.Tx) error { - ver = ReadStorageVersionFromSnapshot(tx) - return nil - }); err != nil { - t.Fatal(err) - } - - assert.Equal(t, tc.expectVersion, ver.String()) - }) } } diff --git a/server/mvcc/buckets/cindex.go b/server/mvcc/buckets/cindex.go new file mode 100644 index 000000000..097a87b9e --- /dev/null +++ b/server/mvcc/buckets/cindex.go @@ -0,0 +1,85 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "encoding/binary" + "go.etcd.io/etcd/server/v3/mvcc/backend" +) + +// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet). +func UnsafeCreateMetaBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(Meta) +} + +// CreateMetaBucket creates the `meta` bucket (if it does not exists yet). +func CreateMetaBucket(tx backend.BatchTx) { + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket(Meta) +} + +// UnsafeReadConsistentIndex loads consistent index & term from given transaction. +// returns 0,0 if the data are not found. +// Term is persisted since v3.5. +func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { + _, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0, 0 + } + v := binary.BigEndian.Uint64(vs[0]) + _, ts := tx.UnsafeRange(Meta, MetaTermKeyName, nil, 0) + if len(ts) == 0 { + return v, 0 + } + t := binary.BigEndian.Uint64(ts[0]) + return v, t +} + +// ReadConsistentIndex loads consistent index and term from given transaction. +// returns 0 if the data are not found. +func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { + tx.Lock() + defer tx.Unlock() + return UnsafeReadConsistentIndex(tx) +} + +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return + } + + if onlyGrow { + oldi, oldTerm := UnsafeReadConsistentIndex(tx) + if term < oldTerm { + return + } + if term == oldTerm && index <= oldi { + return + } + } + + bs1 := make([]byte, 8) + binary.BigEndian.PutUint64(bs1, index) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1) + if term > 0 { + bs2 := make([]byte, 8) + binary.BigEndian.PutUint64(bs2, term) + tx.UnsafePut(Meta, MetaTermKeyName, bs2) + } +} diff --git a/server/mvcc/buckets/confstate_test.go b/server/mvcc/buckets/confstate_test.go index d33599ef4..e70217313 100644 --- a/server/mvcc/buckets/confstate_test.go +++ b/server/mvcc/buckets/confstate_test.go @@ -29,7 +29,7 @@ func TestConfStateFromBackendInOneTx(t *testing.T) { defer betesting.Close(t, be) tx := be.BatchTx() - tx.UnsafeCreateBucket(Meta) + CreateMetaBucket(tx) tx.Lock() defer tx.Unlock() assert.Nil(t, UnsafeConfStateFromBackend(lg, tx)) @@ -47,7 +47,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) { { tx := be.BatchTx() - tx.UnsafeCreateBucket(Meta) + CreateMetaBucket(tx) tx.Commit() } diff --git a/server/mvcc/buckets/version.go b/server/mvcc/buckets/version.go new file mode 100644 index 000000000..261e6f891 --- /dev/null +++ b/server/mvcc/buckets/version.go @@ -0,0 +1,61 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "github.com/coreos/go-semver/semver" + "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/mvcc/backend" +) + +// ReadStorageVersion loads storage version from given backend transaction. +// Populated since v3.6 +func ReadStorageVersion(tx backend.ReadTx) *semver.Version { + tx.Lock() + defer tx.Unlock() + return UnsafeReadStorageVersion(tx) +} + +// UnsafeReadStorageVersion loads storage version from given backend transaction. +// Populated since v3.6 +func UnsafeReadStorageVersion(tx backend.ReadTx) *semver.Version { + _, vs := tx.UnsafeRange(Meta, MetaStorageVersionName, nil, 1) + if len(vs) == 0 { + return nil + } + v, err := semver.NewVersion(string(vs[0])) + if err != nil { + return nil + } + return v +} + +// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction. +// Populated since v3.6 +func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version { + v := tx.Bucket(Meta.Name()).Get(MetaStorageVersionName) + version, err := semver.NewVersion(string(v)) + if err != nil { + return nil + } + return version +} + +// UnsafeSetStorageVersion updates etcd storage version in backend. +// Populated since v3.6 +func UnsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) { + sv := semver.Version{Major: v.Major, Minor: v.Minor} + tx.UnsafePut(Meta, MetaStorageVersionName, []byte(sv.String())) +} diff --git a/server/mvcc/buckets/version_test.go b/server/mvcc/buckets/version_test.go new file mode 100644 index 000000000..cc6f6469a --- /dev/null +++ b/server/mvcc/buckets/version_test.go @@ -0,0 +1,133 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "testing" + "time" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + "go.etcd.io/bbolt" + + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/backend/testing" +) + +// TestVersion ensures that UnsafeSetStorageVersion/UnsafeReadStorageVersion work well together. +func TestVersion(t *testing.T) { + tcs := []struct { + version string + expectVersion string + }{ + { + version: "3.5.0", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-alpha", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-beta.0", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-rc.1", + expectVersion: "3.5.0", + }, + { + version: "3.5.1", + expectVersion: "3.5.0", + }, + } + for _, tc := range tcs { + t.Run(tc.version, func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + tx.UnsafeCreateBucket(Meta) + UnsafeSetStorageVersion(tx, semver.New(tc.version)) + tx.Unlock() + be.ForceCommit() + be.Close() + + b := backend.NewDefaultBackend(tmpPath) + defer b.Close() + v := UnsafeReadStorageVersion(b.BatchTx()) + + assert.Equal(t, tc.expectVersion, v.String()) + }) + } +} + +// TestVersionSnapshot ensures that UnsafeSetStorageVersion/unsafeReadStorageVersionFromSnapshot work well together. +func TestVersionSnapshot(t *testing.T) { + tcs := []struct { + version string + expectVersion string + }{ + { + version: "3.5.0", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-alpha", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-beta.0", + expectVersion: "3.5.0", + }, + { + version: "3.5.0-rc.1", + expectVersion: "3.5.0", + }, + } + for _, tc := range tcs { + t.Run(tc.version, func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + tx.UnsafeCreateBucket(Meta) + UnsafeSetStorageVersion(tx, semver.New(tc.version)) + tx.Unlock() + be.ForceCommit() + be.Close() + db, err := bbolt.Open(tmpPath, 0400, &bbolt.Options{ReadOnly: true}) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + var ver *semver.Version + if err = db.View(func(tx *bbolt.Tx) error { + ver = ReadStorageVersionFromSnapshot(tx) + return nil + }); err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.expectVersion, ver.String()) + + }) + } +} diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 918cecacc..e4a72b65d 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -83,7 +84,7 @@ func BenchmarkConsistentIndex(b *testing.B) { tx := be.BatchTx() tx.Lock() - cindex.UnsafeCreateMetaBucket(tx) + buckets.UnsafeCreateMetaBucket(tx) ci.UnsafeSave(tx) tx.Unlock() diff --git a/server/verify/verify.go b/server/verify/verify.go index f727201ce..d27f77280 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -20,8 +20,8 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/datadir" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" wal2 "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" @@ -109,7 +109,7 @@ func MustVerifyIfEnabled(cfg Config) { func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - index, term := cindex.ReadConsistentIndex(tx) + index, term := buckets.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }