mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Move all named keys to buckets module
This commit is contained in:
@@ -40,11 +40,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
enableFlagKey = []byte("authEnabled")
|
||||
authEnabled = []byte{1}
|
||||
authDisabled = []byte{0}
|
||||
|
||||
revisionKey = []byte("authRevision")
|
||||
authEnabled = []byte{1}
|
||||
authDisabled = []byte{0}
|
||||
|
||||
rootPerm = authpb.Permission{PermType: authpb.READWRITE, Key: []byte{}, RangeEnd: []byte{0}}
|
||||
|
||||
@@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error {
|
||||
return ErrRootRoleNotExist
|
||||
}
|
||||
|
||||
tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled)
|
||||
tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authEnabled)
|
||||
|
||||
as.enabled = true
|
||||
as.tokenProvider.enable()
|
||||
@@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() {
|
||||
b := as.be
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
|
||||
tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authDisabled)
|
||||
as.commitRevision(tx)
|
||||
tx.Unlock()
|
||||
b.ForceCommit()
|
||||
@@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) {
|
||||
as.be = be
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
if bytes.Equal(vs[0], authEnabled) {
|
||||
enabled = true
|
||||
@@ -1041,7 +1038,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
|
||||
tx.UnsafeCreateBucket(buckets.AuthRoles)
|
||||
|
||||
enabled := false
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
if bytes.Equal(vs[0], authEnabled) {
|
||||
enabled = true
|
||||
@@ -1084,11 +1081,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
|
||||
atomic.AddUint64(&as.revision, 1)
|
||||
revBytes := make([]byte, revBytesLen)
|
||||
binary.BigEndian.PutUint64(revBytes, as.Revision())
|
||||
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
|
||||
tx.UnsafePut(buckets.Auth, buckets.AuthRevisionKeyName, revBytes)
|
||||
}
|
||||
|
||||
func getRevision(tx backend.BatchTx) uint64 {
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthRevisionKeyName, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
// this can happen in the initialization phase
|
||||
return 0
|
||||
|
||||
@@ -697,7 +697,7 @@ func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
|
||||
ckey := backendClusterVersionKey()
|
||||
ckey := buckets.ClusterClusterVersionKeyName
|
||||
tx := be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
@@ -716,7 +716,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
|
||||
dkey := backendDowngradeKey()
|
||||
dkey := buckets.ClusterDowngradeKeyName
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@@ -42,7 +42,7 @@ var (
|
||||
)
|
||||
|
||||
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
|
||||
mkey := backendMemberKey(m.ID)
|
||||
mkey := buckets.BackendMemberKey(m.ID)
|
||||
mvalue, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal member", zap.Error(err))
|
||||
@@ -65,7 +65,7 @@ func TrimClusterFromBackend(be backend.Backend) error {
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
|
||||
mkey := backendMemberKey(id)
|
||||
mkey := buckets.BackendMemberKey(id)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
@@ -160,7 +160,7 @@ func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
ckey := backendClusterVersionKey()
|
||||
ckey := buckets.ClusterClusterVersionKeyName
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
@@ -170,7 +170,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
|
||||
dkey := backendDowngradeKey()
|
||||
dkey := buckets.ClusterDowngradeKeyName
|
||||
dvalue, err := json.Marshal(downgrade)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
@@ -281,18 +281,6 @@ func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func backendMemberKey(id types.ID) []byte {
|
||||
return []byte(id.String())
|
||||
}
|
||||
|
||||
func backendClusterVersionKey() []byte {
|
||||
return []byte("clusterVersion")
|
||||
}
|
||||
|
||||
func backendDowngradeKey() []byte {
|
||||
return []byte("downgrade")
|
||||
}
|
||||
|
||||
func mustCreateBackendBuckets(be backend.Backend) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
|
||||
@@ -17,6 +17,7 @@ package buckets
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
)
|
||||
|
||||
@@ -67,11 +68,17 @@ func (b bucket) String() string { return string(b.Name()) }
|
||||
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
|
||||
|
||||
var (
|
||||
// Since v3.0
|
||||
// Pre v3.5
|
||||
ScheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
FinishedCompactKeyName = []byte("finishedCompactRev")
|
||||
MetaConsistentIndexKeyName = []byte("consistent_index")
|
||||
AuthEnabledKeyName = []byte("authEnabled")
|
||||
AuthRevisionKeyName = []byte("authRevision")
|
||||
// Since v3.5
|
||||
MetaTermKeyName = []byte("term")
|
||||
MetaConfStateName = []byte("confState")
|
||||
MetaTermKeyName = []byte("term")
|
||||
MetaConfStateName = []byte("confState")
|
||||
ClusterClusterVersionKeyName = []byte("clusterVersion")
|
||||
ClusterDowngradeKeyName = []byte("downgrade")
|
||||
// Since v3.6
|
||||
MetaStorageVersionName = []byte("storageVersion")
|
||||
// Before adding new meta key please update server/etcdserver/version
|
||||
@@ -84,3 +91,7 @@ func DefaultIgnores(bucket, key []byte) bool {
|
||||
return bytes.Compare(bucket, Meta.Name()) == 0 &&
|
||||
(bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0)
|
||||
}
|
||||
|
||||
func BackendMemberKey(id types.ID) []byte {
|
||||
return []byte(id.String())
|
||||
}
|
||||
|
||||
@@ -34,9 +34,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
|
||||
ErrCompacted = errors.New("mvcc: required revision has been compacted")
|
||||
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
|
||||
)
|
||||
@@ -244,7 +241,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
|
||||
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
// ensure that desired compaction is persisted
|
||||
s.b.ForceCommit()
|
||||
@@ -342,7 +339,7 @@ func (s *store) restore() error {
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
s.revMu.Lock()
|
||||
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
|
||||
@@ -350,12 +347,12 @@ func (s *store) restore() error {
|
||||
s.lg.Info(
|
||||
"restored last compact revision",
|
||||
zap.Stringer("meta-bucket-name", buckets.Meta),
|
||||
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
|
||||
zap.String("meta-bucket-name-key", string(buckets.FinishedCompactKeyName)),
|
||||
zap.Int64("restored-compact-revision", s.compactMainRev),
|
||||
)
|
||||
s.revMu.Unlock()
|
||||
}
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
|
||||
@@ -427,7 +424,7 @@ func (s *store) restore() error {
|
||||
s.lg.Info(
|
||||
"resume scheduled compaction",
|
||||
zap.Stringer("meta-bucket-name", buckets.Meta),
|
||||
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
|
||||
zap.String("meta-bucket-name-key", string(buckets.ScheduledCompactKeyName)),
|
||||
zap.Int64("scheduled-compact-revision", scheduledCompact),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
if len(keys) < batchNum {
|
||||
rbytes := make([]byte, 8+1+8)
|
||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
|
||||
tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
s.lg.Info(
|
||||
"finished scheduled compaction",
|
||||
|
||||
@@ -89,10 +89,10 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))
|
||||
}
|
||||
}
|
||||
_, vals := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
_, vals := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0)
|
||||
revToBytes(revision{main: tt.rev}, ibytes)
|
||||
if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) {
|
||||
t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w)
|
||||
t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, w)
|
||||
}
|
||||
tx.Unlock()
|
||||
|
||||
|
||||
@@ -343,10 +343,10 @@ func TestStoreCompact(t *testing.T) {
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(4))
|
||||
wact := []testutil.Action{
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
|
||||
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
}
|
||||
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
||||
t.Errorf("tx actions = %+v, want %+v", g, wact)
|
||||
@@ -384,8 +384,8 @@ func TestStoreRestore(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
|
||||
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
|
||||
b.tx.rangeRespc <- rangeResp{nil, nil}
|
||||
@@ -399,8 +399,8 @@ func TestStoreRestore(t *testing.T) {
|
||||
t.Errorf("current rev = %v, want 5", s.currentRev)
|
||||
}
|
||||
wact := []testutil.Action{
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, []byte(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, []byte(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
|
||||
}
|
||||
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
||||
@@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
||||
revToBytes(revision{main: 2}, rbytes)
|
||||
tx := s0.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
|
||||
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
|
||||
s0.Close()
|
||||
|
||||
Reference in New Issue
Block a user