Merge pull request #13190 from serathius/backend

Introduces Backend interfaces for alarm and auth bucket
This commit is contained in:
Piotr Tabor 2021-07-23 15:07:14 +02:00 committed by GitHub
commit 53d234f1fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 954 additions and 283 deletions

View File

@ -311,7 +311,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
be := backend.NewDefaultBackend(destDB)
defer be.Close()
ms := schema.NewMembershipStore(lg, be)
ms := schema.NewMembershipBackend(lg, be)
if err := ms.TrimClusterFromBackend(); err != nil {
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
}

View File

@ -306,7 +306,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
err = schema.NewMembershipStore(s.lg, be).TrimMembershipFromBackend()
err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
if err != nil {
return err
}
@ -403,7 +403,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
s.cl.SetStore(st)
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
s.cl.SetBackend(schema.NewMembershipStore(s.lg, be))
s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
for _, m := range s.cl.Members() {
s.cl.AddMember(m, true)
}

View File

@ -17,14 +17,11 @@ package auth
import (
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/pkg/v3/adt"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
user := schema.UnsafeGetUser(lg, tx, userName)
func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions {
user := tx.UnsafeGetUser(userName)
if user == nil {
return nil
}
@ -33,7 +30,7 @@ func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifie
writePerms := adt.NewIntervalTree()
for _, roleName := range user.Roles {
role := schema.UnsafeGetRole(lg, tx, roleName)
role := tx.UnsafeGetRole(roleName)
if role == nil {
continue
}
@ -106,11 +103,11 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}
func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {
perms := getMergedPerms(as.lg, tx, userName)
perms := getMergedPerms(tx, userName)
if perms == nil {
as.lg.Error(
"failed to create a merged permission",

View File

@ -28,8 +28,6 @@ import (
"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
@ -103,7 +101,7 @@ type AuthStore interface {
Authenticate(ctx context.Context, username, password string) (*pb.AuthenticateResponse, error)
// Recover recovers the state of auth store from the given backend
Recover(b backend.Backend)
Recover(be AuthBackend)
// UserAdd adds a new user
UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
@ -195,12 +193,44 @@ type TokenProvider interface {
genTokenPrefix() (string, error)
}
type AuthBackend interface {
CreateAuthBuckets()
ForceCommit()
BatchTx() AuthBatchTx
GetUser(string) *authpb.User
GetAllUsers() []*authpb.User
GetRole(string) *authpb.Role
GetAllRoles() []*authpb.Role
}
type AuthBatchTx interface {
AuthReadTx
UnsafeSaveAuthEnabled(enabled bool)
UnsafeSaveAuthRevision(rev uint64)
UnsafePutUser(*authpb.User)
UnsafeDeleteUser(string)
UnsafePutRole(*authpb.Role)
UnsafeDeleteRole(string)
}
type AuthReadTx interface {
UnsafeReadAuthEnabled() bool
UnsafeReadAuthRevision() uint64
UnsafeGetUser(string) *authpb.User
UnsafeGetRole(string) *authpb.Role
UnsafeGetAllUsers() []*authpb.User
UnsafeGetAllRoles() []*authpb.Role
Lock()
Unlock()
}
type authStore struct {
// atomic operations; need 64-bit align, or 32-bit tests will crash
revision uint64
lg *zap.Logger
be backend.Backend
be AuthBackend
enabled bool
enabledMu sync.RWMutex
@ -217,15 +247,14 @@ func (as *authStore) AuthEnable() error {
as.lg.Info("authentication is already enabled; ignored auth enable request")
return nil
}
b := as.be
tx := b.BatchTx()
tx := as.be.BatchTx()
tx.Lock()
defer func() {
tx.Unlock()
b.ForceCommit()
as.be.ForceCommit()
}()
u := schema.UnsafeGetUser(as.lg, tx, rootUser)
u := tx.UnsafeGetUser(rootUser)
if u == nil {
return ErrRootUserNotExist
}
@ -234,14 +263,13 @@ func (as *authStore) AuthEnable() error {
return ErrRootRoleNotExist
}
schema.UnsafeSaveAuthEnabled(tx, true)
tx.UnsafeSaveAuthEnabled(true)
as.enabled = true
as.tokenProvider.enable()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
as.setRevision(getRevision(tx))
as.setRevision(tx.UnsafeReadAuthRevision())
as.lg.Info("enabled authentication")
return nil
@ -254,11 +282,13 @@ func (as *authStore) AuthDisable() {
return
}
b := as.be
tx := b.BatchTx()
tx.Lock()
schema.UnsafeSaveAuthEnabled(tx, false)
tx.UnsafeSaveAuthEnabled(false)
as.commitRevision(tx)
tx.Unlock()
b.ForceCommit()
as.enabled = false
@ -281,12 +311,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
if !as.IsAuthEnabled() {
return nil, ErrAuthNotEnabled
}
tx := as.be.BatchTx()
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, username)
user := as.be.GetUser(username)
if user == nil {
return nil, ErrAuthFailed
}
@ -324,7 +349,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
tx.Lock()
defer tx.Unlock()
user = schema.UnsafeGetUser(as.lg, tx, username)
user = tx.UnsafeGetUser(username)
if user == nil {
return 0, ErrAuthFailed
}
@ -333,7 +358,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
return 0, ErrNoPasswordUser
}
return getRevision(tx), nil
return tx.UnsafeReadAuthRevision(), nil
}()
if err != nil {
return 0, err
@ -346,13 +371,13 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
return revision, nil
}
func (as *authStore) Recover(be backend.Backend) {
func (as *authStore) Recover(be AuthBackend) {
as.be = be
tx := be.BatchTx()
tx.Lock()
enabled := schema.UnsafeReadAuthEnabled(tx)
as.setRevision(getRevision(tx))
enabled := tx.UnsafeReadAuthEnabled()
as.setRevision(tx.UnsafeReadAuthRevision())
tx.Unlock()
@ -381,7 +406,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
user := tx.UnsafeGetUser(r.Name)
if user != nil {
return nil, ErrUserAlreadyExist
}
@ -408,8 +433,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
Password: password,
Options: options,
}
schema.UnsafePutUser(as.lg, tx, newUser)
tx.UnsafePutUser(newUser)
as.commitRevision(tx)
@ -427,12 +451,11 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
user := tx.UnsafeGetUser(r.Name)
if user == nil {
return nil, ErrUserNotFound
}
schema.UnsafeDeleteUser(tx, r.Name)
tx.UnsafeDeleteUser(r.Name)
as.commitRevision(tx)
@ -452,7 +475,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
user := tx.UnsafeGetUser(r.Name)
if user == nil {
return nil, ErrUserNotFound
}
@ -473,8 +496,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
Password: password,
Options: user.Options,
}
schema.UnsafePutUser(as.lg, tx, updatedUser)
tx.UnsafePutUser(updatedUser)
as.commitRevision(tx)
@ -494,13 +516,13 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, r.User)
user := tx.UnsafeGetUser(r.User)
if user == nil {
return nil, ErrUserNotFound
}
if r.Role != rootRole {
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
role := tx.UnsafeGetRole(r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -520,7 +542,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
user.Roles = append(user.Roles, r.Role)
sort.Strings(user.Roles)
schema.UnsafePutUser(as.lg, tx, user)
tx.UnsafePutUser(user)
as.invalidateCachedPerm(r.User)
@ -536,10 +558,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
}
func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
tx.Unlock()
user := as.be.GetUser(r.Name)
if user == nil {
return nil, ErrUserNotFound
@ -551,10 +570,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,
}
func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
users := schema.UnsafeGetAllUsers(as.lg, tx)
tx.Unlock()
users := as.be.GetAllUsers()
resp := &pb.AuthUserListResponse{Users: make([]string, len(users))}
for i := range users {
@ -577,7 +593,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
user := tx.UnsafeGetUser(r.Name)
if user == nil {
return nil, ErrUserNotFound
}
@ -598,7 +614,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
return nil, ErrRoleNotGranted
}
schema.UnsafePutUser(as.lg, tx, updatedUser)
tx.UnsafePutUser(updatedUser)
as.invalidateCachedPerm(r.Name)
@ -615,13 +631,9 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
}
func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
defer tx.Unlock()
var resp pb.AuthRoleGetResponse
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
role := as.be.GetRole(r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -634,10 +646,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
}
func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
roles := schema.UnsafeGetAllRoles(as.lg, tx)
tx.Unlock()
roles := as.be.GetAllRoles()
resp := &pb.AuthRoleListResponse{Roles: make([]string, len(roles))}
for i := range roles {
@ -651,7 +660,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
tx.Lock()
defer tx.Unlock()
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
role := tx.UnsafeGetRole(r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -670,7 +679,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
return nil, ErrPermissionNotGranted
}
schema.UnsafePutRole(as.lg, tx, updatedRole)
tx.UnsafePutRole(updatedRole)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
@ -697,14 +706,14 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
tx.Lock()
defer tx.Unlock()
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
role := tx.UnsafeGetRole(r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
schema.UnsafeDeleteRole(tx, r.Role)
tx.UnsafeDeleteRole(r.Role)
users := schema.UnsafeGetAllUsers(as.lg, tx)
users := tx.UnsafeGetAllUsers()
for _, user := range users {
updatedUser := &authpb.User{
Name: user.Name,
@ -722,7 +731,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
continue
}
schema.UnsafePutUser(as.lg, tx, updatedUser)
tx.UnsafePutUser(updatedUser)
as.invalidateCachedPerm(string(user.Name))
}
@ -742,7 +751,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
tx.Lock()
defer tx.Unlock()
role := schema.UnsafeGetRole(as.lg, tx, r.Name)
role := tx.UnsafeGetRole(r.Name)
if role != nil {
return nil, ErrRoleAlreadyExist
}
@ -751,7 +760,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
Name: []byte(r.Name),
}
schema.UnsafePutRole(as.lg, tx, newRole)
tx.UnsafePutRole(newRole)
as.commitRevision(tx)
@ -786,7 +795,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
tx.Lock()
defer tx.Unlock()
role := schema.UnsafeGetRole(as.lg, tx, r.Name)
role := tx.UnsafeGetRole(r.Name)
if role == nil {
return nil, ErrRoleNotFound
}
@ -810,7 +819,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
sort.Sort(permSlice(role.KeyPermission))
}
schema.UnsafePutRole(as.lg, tx, role)
tx.UnsafePutRole(role)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
@ -850,7 +859,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
tx.Lock()
defer tx.Unlock()
user := schema.UnsafeGetUser(as.lg, tx, userName)
user := tx.UnsafeGetUser(userName)
if user == nil {
as.lg.Error("cannot find a user for permission check", zap.String("user-name", userName))
return ErrPermissionDenied
@ -888,10 +897,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()
u := schema.UnsafeGetUser(as.lg, tx, authInfo.Username)
tx.Unlock()
u := as.be.GetUser(authInfo.Username)
if u == nil {
return ErrUserNotFound
@ -911,7 +917,7 @@ func (as *authStore) IsAuthEnabled() bool {
}
// NewAuthStore creates a new AuthStore.
func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore {
func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost int) *authStore {
if lg == nil {
lg = zap.NewNop()
}
@ -927,17 +933,12 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
bcryptCost = bcrypt.DefaultCost
}
be.CreateAuthBuckets()
tx := be.BatchTx()
tx.Lock()
schema.UnsafeCreateAuthBucket(tx)
tx.UnsafeCreateBucket(schema.AuthUsers)
tx.UnsafeCreateBucket(schema.AuthRoles)
enabled := schema.UnsafeReadAuthEnabled(tx)
enabled := tx.UnsafeReadAuthEnabled()
as := &authStore{
revision: getRevision(tx),
revision: tx.UnsafeReadAuthRevision(),
lg: lg,
be: be,
enabled: enabled,
@ -968,13 +969,9 @@ func hasRootRole(u *authpb.User) bool {
return idx != len(u.Roles) && u.Roles[idx] == rootRole
}
func (as *authStore) commitRevision(tx backend.BatchTx) {
func (as *authStore) commitRevision(tx AuthBatchTx) {
atomic.AddUint64(&as.revision, 1)
schema.UnsafeSaveAuthRevision(tx, as.Revision())
}
func getRevision(tx backend.BatchTx) uint64 {
return schema.UnsafeReadAuthRevision(tx)
tx.UnsafeSaveAuthRevision(as.Revision())
}
func (as *authStore) setRevision(rev uint64) {
@ -1169,7 +1166,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {
func (as *authStore) HasRole(user, role string) bool {
tx := as.be.BatchTx()
tx.Lock()
u := schema.UnsafeGetUser(as.lg, tx, user)
u := tx.UnsafeGetUser(user)
tx.Unlock()
if u == nil {

View File

@ -0,0 +1,127 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import "go.etcd.io/etcd/api/v3/authpb"
type backendMock struct {
users map[string]*authpb.User
roles map[string]*authpb.Role
enabled bool
revision uint64
}
func newBackendMock() *backendMock {
return &backendMock{
users: make(map[string]*authpb.User),
roles: make(map[string]*authpb.Role),
}
}
func (b *backendMock) CreateAuthBuckets() {
}
func (b *backendMock) ForceCommit() {
}
func (b *backendMock) BatchTx() AuthBatchTx {
return &txMock{be: b}
}
func (b *backendMock) GetUser(s string) *authpb.User {
return b.users[s]
}
func (b *backendMock) GetAllUsers() []*authpb.User {
return b.BatchTx().UnsafeGetAllUsers()
}
func (b *backendMock) GetRole(s string) *authpb.Role {
return b.roles[s]
}
func (b *backendMock) GetAllRoles() []*authpb.Role {
return b.BatchTx().UnsafeGetAllRoles()
}
var _ AuthBackend = (*backendMock)(nil)
type txMock struct {
be *backendMock
}
var _ AuthBatchTx = (*txMock)(nil)
func (t txMock) UnsafeReadAuthEnabled() bool {
return t.be.enabled
}
func (t txMock) UnsafeReadAuthRevision() uint64 {
return t.be.revision
}
func (t txMock) UnsafeGetUser(s string) *authpb.User {
return t.be.users[s]
}
func (t txMock) UnsafeGetRole(s string) *authpb.Role {
return t.be.roles[s]
}
func (t txMock) UnsafeGetAllUsers() []*authpb.User {
users := []*authpb.User{}
for _, u := range t.be.users {
users = append(users, u)
}
return users
}
func (t txMock) UnsafeGetAllRoles() []*authpb.Role {
roles := []*authpb.Role{}
for _, r := range t.be.roles {
roles = append(roles, r)
}
return roles
}
func (t txMock) Lock() {
}
func (t txMock) Unlock() {
}
func (t txMock) UnsafeSaveAuthEnabled(enabled bool) {
t.be.enabled = enabled
}
func (t txMock) UnsafeSaveAuthRevision(rev uint64) {
t.be.revision = rev
}
func (t txMock) UnsafePutUser(user *authpb.User) {
t.be.users[string(user.Name)] = user
}
func (t txMock) UnsafeDeleteUser(s string) {
delete(t.be.users, s)
}
func (t txMock) UnsafePutRole(role *authpb.Role) {
t.be.roles[string(role.Name)] = role
}
func (t txMock) UnsafeDeleteRole(s string) {
delete(t.be.roles, s)
}

View File

@ -27,9 +27,6 @@ import (
"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc/metadata"
@ -46,25 +43,21 @@ func dummyIndexWaiter(index uint64) <-chan struct{} {
// TestNewAuthStoreRevision ensures newly auth store
// keeps the old revision when there are no changes.
func TestNewAuthStoreRevision(t *testing.T) {
b, tPath := betesting.NewDefaultTmpBackend(t)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
be := newBackendMock()
as := NewAuthStore(zap.NewExample(), be, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
old := as.Revision()
as.Close()
b.Close()
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
defer b2.Close()
as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost)
as = NewAuthStore(zap.NewExample(), be, tp, bcrypt.MinCost)
defer as.Close()
new := as.Revision()
@ -75,9 +68,6 @@ func TestNewAuthStoreRevision(t *testing.T) {
// TestNewAuthStoreBryptCost ensures that NewAuthStore uses default when given bcrypt-cost is invalid
func TestNewAuthStoreBcryptCost(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
if err != nil {
t.Fatal(err)
@ -85,7 +75,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {
invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, tp, invalidCost)
as := NewAuthStore(zap.NewExample(), newBackendMock(), tp, invalidCost)
defer as.Close()
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
@ -99,13 +89,11 @@ func encodePassword(s string) string {
}
func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testing.T)) {
b, _ := betesting.NewDefaultTmpBackend(t)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), newBackendMock(), tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
@ -124,7 +112,6 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin
}
tearDown := func(_ *testing.T) {
b.Close()
as.Close()
}
return as, tearDown
@ -693,14 +680,11 @@ func TestIsAuthEnabled(t *testing.T) {
// TestAuthRevisionRace ensures that access to authStore.revision is thread-safe.
func TestAuthInfoFromCtxRace(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), newBackendMock(), tp, bcrypt.MinCost)
defer as.Close()
donec := make(chan struct{})
@ -846,15 +830,12 @@ func TestHammerSimpleAuthenticate(t *testing.T) {
// TestRolesOrder tests authpb.User.Roles is sorted
func TestRolesOrder(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
defer tp.disable()
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), newBackendMock(), tp, bcrypt.MinCost)
defer as.Close()
err = enableAuthAndCreateRoot(as)
if err != nil {
@ -903,14 +884,11 @@ func TestAuthInfoFromCtxWithRootJWT(t *testing.T) {
// testAuthInfoFromCtxWithRoot ensures "WithRoot" properly embeds token in the context.
func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
tp, err := NewTokenProvider(zap.NewExample(), opts, dummyIndexWaiter, simpleTokenTTLDefault)
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), newBackendMock(), tp, bcrypt.MinCost)
defer as.Close()
if err = enableAuthAndCreateRoot(as); err != nil {

View File

@ -21,7 +21,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -30,6 +29,14 @@ type BackendGetter interface {
Backend() backend.Backend
}
type AlarmBackend interface {
CreateAlarmBucket()
MustPutAlarm(member *pb.AlarmMember)
MustDeleteAlarm(alarm *pb.AlarmMember)
GetAllAlarms() ([]*pb.AlarmMember, error)
ForceCommit()
}
type alarmSet map[types.ID]*pb.AlarmMember
// AlarmStore persists alarms to the backend.
@ -38,14 +45,14 @@ type AlarmStore struct {
mu sync.Mutex
types map[pb.AlarmType]alarmSet
bg BackendGetter
be AlarmBackend
}
func NewAlarmStore(lg *zap.Logger, bg BackendGetter) (*AlarmStore, error) {
func NewAlarmStore(lg *zap.Logger, be AlarmBackend) (*AlarmStore, error) {
if lg == nil {
lg = zap.NewNop()
}
ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), bg: bg}
ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), be: be}
err := ret.restore()
return ret, err
}
@ -59,7 +66,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
return m
}
schema.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm)
a.be.MustPutAlarm(newAlarm)
return newAlarm
}
@ -79,7 +86,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
delete(t, id)
schema.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m)
a.be.MustDeleteAlarm(m)
return m
}
@ -101,20 +108,15 @@ func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
}
func (a *AlarmStore) restore() error {
b := a.bg.Backend()
tx := b.BatchTx()
tx.Lock()
schema.UnsafeCreateAlarmBucket(tx)
ms, err := schema.UnsafeGetAllAlarms(tx)
tx.Unlock()
a.be.CreateAlarmBucket()
ms, err := a.be.GetAllAlarms()
if err != nil {
return err
}
for _, m := range ms {
a.addToMap(m)
}
b.ForceCommit()
a.be.ForceCommit()
return err
}

View File

@ -200,7 +200,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
remotes := existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, nil)
cl.SetID(br.wal.id, existingCluster.ID())
return &bootstrappedServer{
@ -240,7 +240,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}
}
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
cl.SetID(br.wal.id, cl.ID())
return &bootstrappedServer{
@ -330,7 +330,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
}
r.raft.cl.SetStore(st)
r.raft.cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
r.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
r.raft.cl.Recover(api.UpdateCapability)
if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
bepath := cfg.BackendPath()

View File

@ -398,7 +398,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
@ -1059,7 +1059,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if s.authStore != nil {
lg.Info("restoring auth store")
s.authStore.Recover(newbe)
s.authStore.Recover(schema.NewAuthBackend(lg, newbe))
lg.Info("restored auth store")
}
@ -1075,7 +1075,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Info("restored v2 store")
s.cluster.SetBackend(schema.NewMembershipStore(lg, newbe))
s.cluster.SetBackend(schema.NewMembershipBackend(lg, newbe))
lg.Info("restoring cluster configuration")
@ -2361,7 +2361,7 @@ func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
func (s *EtcdServer) restoreAlarms() error {
s.applyV3 = s.newApplierV3()
as, err := v3alarm.NewAlarmStore(s.lg, s)
as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be))
if err != nil {
return err
}

View File

@ -1618,7 +1618,7 @@ func TestPublishV3(t *testing.T) {
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, 0),
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
@ -1689,7 +1689,7 @@ func TestPublishV3Retry(t *testing.T) {
cluster: &membership.RaftCluster{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, 0),
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
be: be,
ctx: ctx,
cancel: cancel,

View File

@ -123,7 +123,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(schema.Key)
tx.UnsafeCreateBucket(schema.Meta)
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
s.b.ForceCommit()
@ -340,7 +340,6 @@ func (s *store) restore() error {
s.lg.Info(
"restored last compact revision",
zap.Stringer("meta-bucket-name", schema.Meta),
zap.String("meta-bucket-name-key", string(schema.FinishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
@ -412,8 +411,6 @@ func (s *store) restore() error {
s.lg.Info(
"resume scheduled compaction",
zap.Stringer("meta-bucket-name", schema.Meta),
zap.String("meta-bucket-name-key", string(schema.ScheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)
}

View File

@ -91,7 +91,7 @@ func TestScheduleCompaction(t *testing.T) {
}
vals, _ := UnsafeReadFinishedCompact(tx)
if !reflect.DeepEqual(vals, tt.rev) {
t.Errorf("#%d: vals on %v = %+v, want %+v", i, schema.FinishedCompactKeyName, vals, tt.rev)
t.Errorf("#%d: finished compact equal %+v, want %+v", i, vals, tt.rev)
}
tx.Unlock()

View File

@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
UnsafeSetScheduledCompact(tx, 2)
tx.Unlock()
s0.Close()

View File

@ -20,41 +20,65 @@ import (
"go.uber.org/zap"
)
func UnsafeCreateAlarmBucket(tx backend.BatchTx) {
type alarmBackend struct {
lg *zap.Logger
be backend.Backend
}
func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {
return &alarmBackend{
lg: lg,
be: be,
}
}
func (s *alarmBackend) CreateAlarmBucket() {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Alarm)
}
func MustPutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
MustUnsafePutAlarm(lg, tx, alarm)
s.mustUnsafePutAlarm(tx, alarm)
}
func MustUnsafePutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal()
if err != nil {
lg.Panic("failed to marshal alarm member", zap.Error(err))
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
}
tx.UnsafePut(Alarm, v, nil)
}
func MustDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
MustUnsafeDeleteAlarm(lg, tx, alarm)
s.mustUnsafeDeleteAlarm(tx, alarm)
}
func MustUnsafeDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal()
if err != nil {
lg.Panic("failed to marshal alarm member", zap.Error(err))
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
}
tx.UnsafeDelete(Alarm, v)
}
func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) {
tx := s.be.ReadTx()
tx.Lock()
defer tx.Unlock()
return s.unsafeGetAllAlarms(tx)
}
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
ms := []*etcdserverpb.AlarmMember{}
err := tx.UnsafeForEach(Alarm, func(k, v []byte) error {
var m etcdserverpb.AlarmMember
@ -66,3 +90,7 @@ func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error)
})
return ms, err
}
func (s alarmBackend) ForceCommit() {
s.be.ForceCommit()
}

View File

@ -17,6 +17,10 @@ package schema
import (
"bytes"
"encoding/binary"
"go.uber.org/zap"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/storage/backend"
)
@ -29,20 +33,60 @@ var (
authDisabled = []byte{0}
)
func UnsafeCreateAuthBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Auth)
type authBackend struct {
be backend.Backend
lg *zap.Logger
}
func UnsafeSaveAuthEnabled(tx backend.BatchTx, enabled bool) {
if enabled {
tx.UnsafePut(Auth, AuthEnabledKeyName, authEnabled)
} else {
tx.UnsafePut(Auth, AuthEnabledKeyName, authDisabled)
var _ auth.AuthBackend = (*authBackend)(nil)
func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend {
return &authBackend{
be: be,
lg: lg,
}
}
func UnsafeReadAuthEnabled(tx backend.ReadTx) bool {
_, vs := tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
func (abe *authBackend) CreateAuthBuckets() {
tx := abe.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Auth)
tx.UnsafeCreateBucket(AuthUsers)
tx.UnsafeCreateBucket(AuthRoles)
}
func (abe *authBackend) ForceCommit() {
abe.be.ForceCommit()
}
func (abe *authBackend) BatchTx() auth.AuthBatchTx {
return &authBatchTx{tx: abe.be.BatchTx(), lg: abe.lg}
}
type authBatchTx struct {
tx backend.BatchTx
lg *zap.Logger
}
var _ auth.AuthBatchTx = (*authBatchTx)(nil)
func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) {
if enabled {
atx.tx.UnsafePut(Auth, AuthEnabledKeyName, authEnabled)
} else {
atx.tx.UnsafePut(Auth, AuthEnabledKeyName, authDisabled)
}
}
func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) {
revBytes := make([]byte, revBytesLen)
binary.BigEndian.PutUint64(revBytes, rev)
atx.tx.UnsafePut(Auth, AuthRevisionKeyName, revBytes)
}
func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
_, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
return true
@ -51,17 +95,19 @@ func UnsafeReadAuthEnabled(tx backend.ReadTx) bool {
return false
}
func UnsafeSaveAuthRevision(tx backend.BatchTx, rev uint64) {
revBytes := make([]byte, revBytesLen)
binary.BigEndian.PutUint64(revBytes, rev)
tx.UnsafePut(Auth, AuthRevisionKeyName, revBytes)
}
func UnsafeReadAuthRevision(tx backend.ReadTx) uint64 {
_, vs := tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
_, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0
}
return binary.BigEndian.Uint64(vs[0])
}
func (atx *authBatchTx) Lock() {
atx.tx.Lock()
}
func (atx *authBatchTx) Unlock() {
atx.tx.Unlock()
}

View File

@ -20,8 +20,19 @@ import (
"go.uber.org/zap"
)
func UnsafeGetRole(lg *zap.Logger, tx backend.BatchTx, roleName string) *authpb.Role {
_, vs := tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(AuthRoles)
}
func (abe *authBackend) GetRole(roleName string) *authpb.Role {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetRole(roleName)
}
func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role {
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
if len(vs) == 0 {
return nil
}
@ -29,13 +40,20 @@ func UnsafeGetRole(lg *zap.Logger, tx backend.BatchTx, roleName string) *authpb.
role := &authpb.Role{}
err := role.Unmarshal(vs[0])
if err != nil {
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
}
return role
}
func UnsafeGetAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
_, vs := tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
func (abe *authBackend) GetAllRoles() []*authpb.Role {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllRoles()
}
func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role {
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
@ -45,26 +63,26 @@ func UnsafeGetAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
role := &authpb.Role{}
err := role.Unmarshal(vs[i])
if err != nil {
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
}
roles[i] = role
}
return roles
}
func UnsafePutRole(lg *zap.Logger, tx backend.BatchTx, role *authpb.Role) {
func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) {
b, err := role.Marshal()
if err != nil {
lg.Panic(
atx.lg.Panic(
"failed to marshal 'authpb.Role'",
zap.String("role-name", string(role.Name)),
zap.Error(err),
)
}
tx.UnsafePut(AuthRoles, role.Name, b)
atx.tx.UnsafePut(AuthRoles, role.Name, b)
}
func UnsafeDeleteRole(tx backend.BatchTx, rolename string) {
tx.UnsafeDelete(AuthRoles, []byte(rolename))
func (atx *authBatchTx) UnsafeDeleteRole(rolename string) {
atx.tx.UnsafeDelete(AuthRoles, []byte(rolename))
}

View File

@ -0,0 +1,228 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
func TestGetAllRoles(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
want []*authpb.Role
}{
{
name: "Empty by default",
setup: func(tx auth.AuthBatchTx) {},
want: nil,
},
{
name: "Returns data put before",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("readKey"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
Key: []byte("key"),
RangeEnd: []byte("end"),
},
},
})
},
want: []*authpb.Role{
{
Name: []byte("readKey"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
Key: []byte("key"),
RangeEnd: []byte("end"),
},
},
},
},
},
{
name: "Skips deleted",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
})
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role2"),
})
tx.UnsafeDeleteRole("role1")
},
want: []*authpb.Role{{Name: []byte("role2")}},
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
},
},
})
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role2"),
})
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READWRITE,
},
},
})
},
want: []*authpb.Role{
{Name: []byte("role1"), KeyPermission: []*authpb.Permission{{PermType: authpb.READWRITE}}},
{Name: []byte("role2")},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
abe := NewAuthBackend(zaptest.NewLogger(t), be)
abe.CreateAuthBuckets()
tx := abe.BatchTx()
tx.Lock()
tc.setup(tx)
tx.Unlock()
abe.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
users := abe2.GetAllRoles()
assert.Equal(t, tc.want, users)
})
}
}
func TestGetRole(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
want *authpb.Role
}{
{
name: "Returns nil for missing",
setup: func(tx auth.AuthBatchTx) {},
want: nil,
},
{
name: "Returns data put before",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
Key: []byte("key"),
RangeEnd: []byte("end"),
},
},
})
},
want: &authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
Key: []byte("key"),
RangeEnd: []byte("end"),
},
},
},
},
{
name: "Return nil for deleted",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
})
tx.UnsafeDeleteRole("role1")
},
want: nil,
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READ,
},
},
})
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
{
PermType: authpb.READWRITE,
},
},
})
},
want: &authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{{PermType: authpb.READWRITE}},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
abe := NewAuthBackend(zaptest.NewLogger(t), be)
abe.CreateAuthBuckets()
tx := abe.BatchTx()
tx.Lock()
tc.setup(tx)
tx.Unlock()
abe.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
users := abe2.GetRole("role1")
assert.Equal(t, tc.want, users)
})
}
}

View File

@ -15,12 +15,13 @@
package schema
import (
"fmt"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
@ -28,34 +29,51 @@ import (
// TestAuthEnabled ensures that UnsafeSaveAuthEnabled&UnsafeReadAuthEnabled work well together.
func TestAuthEnabled(t *testing.T) {
tcs := []struct {
enabled bool
name string
skipSetting bool
setEnabled bool
wantEnabled bool
}{
{
enabled: true,
name: "Returns true after setting true",
setEnabled: true,
wantEnabled: true,
},
{
enabled: false,
name: "Returns false after setting false",
setEnabled: false,
wantEnabled: false,
},
{
name: "Returns false by default",
skipSetting: true,
wantEnabled: false,
},
}
for _, tc := range tcs {
t.Run(fmt.Sprint(tc.enabled), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
abe := NewAuthBackend(zaptest.NewLogger(t), be)
tx := abe.BatchTx()
abe.CreateAuthBuckets()
tx.Lock()
UnsafeCreateAuthBucket(tx)
UnsafeSaveAuthEnabled(tx, tc.enabled)
if !tc.skipSetting {
tx.UnsafeSaveAuthEnabled(tc.setEnabled)
}
tx.Unlock()
be.ForceCommit()
abe.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := UnsafeReadAuthEnabled(b.BatchTx())
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
tx = abe2.BatchTx()
tx.Lock()
defer tx.Unlock()
v := tx.UnsafeReadAuthEnabled()
assert.Equal(t, tc.enabled, v)
assert.Equal(t, tc.wantEnabled, v)
})
}
}
@ -63,37 +81,49 @@ func TestAuthEnabled(t *testing.T) {
// TestAuthRevision ensures that UnsafeSaveAuthRevision&UnsafeReadAuthRevision work well together.
func TestAuthRevision(t *testing.T) {
tcs := []struct {
revision uint64
name string
setRevision uint64
wantRevision uint64
}{
{
revision: 0,
name: "Returns 0 by default",
wantRevision: 0,
},
{
revision: 1,
name: "Returns 1 after setting 1",
setRevision: 1,
wantRevision: 1,
},
{
revision: math.MaxUint64,
name: "Returns max int after setting max int",
setRevision: math.MaxUint64,
wantRevision: math.MaxUint64,
},
}
for _, tc := range tcs {
t.Run(fmt.Sprint(tc.revision), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
abe := NewAuthBackend(zaptest.NewLogger(t), be)
abe.CreateAuthBuckets()
if tc.setRevision != 0 {
tx := abe.BatchTx()
tx.Lock()
tx.UnsafeSaveAuthRevision(tc.setRevision)
tx.Unlock()
}
tx.Lock()
UnsafeCreateAuthBucket(tx)
UnsafeSaveAuthRevision(tx, tc.revision)
tx.Unlock()
be.ForceCommit()
abe.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
v := UnsafeReadAuthRevision(b.BatchTx())
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
tx := abe2.BatchTx()
tx.Lock()
defer tx.Unlock()
v := tx.UnsafeReadAuthRevision()
assert.Equal(t, tc.revision, v)
assert.Equal(t, tc.wantRevision, v)
})
}
}

View File

@ -16,12 +16,18 @@ package schema
import (
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.uber.org/zap"
)
func UnsafeGetUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
func (abe *authBackend) GetUser(username string) *authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetUser(username)
}
func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
}
@ -29,7 +35,7 @@ func UnsafeGetUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.
user := &authpb.User{}
err := user.Unmarshal(vs[0])
if err != nil {
lg.Panic(
atx.lg.Panic(
"failed to unmarshal 'authpb.User'",
zap.String("user-name", username),
zap.Error(err),
@ -38,8 +44,15 @@ func UnsafeGetUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.
return user
}
func UnsafeGetAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
_, vs := tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1)
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
@ -49,21 +62,21 @@ func UnsafeGetAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
user := &authpb.User{}
err := user.Unmarshal(vs[i])
if err != nil {
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
}
users[i] = user
}
return users
}
func UnsafePutUser(lg *zap.Logger, tx backend.BatchTx, user *authpb.User) {
func (atx *authBatchTx) UnsafePutUser(user *authpb.User) {
b, err := user.Marshal()
if err != nil {
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
}
tx.UnsafePut(AuthUsers, user.Name, b)
atx.tx.UnsafePut(AuthUsers, user.Name, b)
}
func UnsafeDeleteUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(AuthUsers, []byte(username))
func (atx *authBatchTx) UnsafeDeleteUser(username string) {
atx.tx.UnsafeDelete(AuthUsers, []byte(username))
}

View File

@ -0,0 +1,204 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
func TestGetAllUsers(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
want []*authpb.User
}{
{
name: "Empty by default",
setup: func(tx auth.AuthBatchTx) {},
want: nil,
},
{
name: "Returns user put before",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("alicePassword"),
Roles: []string{"aliceRole1", "aliceRole2"},
Options: &authpb.UserAddOptions{
NoPassword: true,
},
})
},
want: []*authpb.User{
{
Name: []byte("alice"),
Password: []byte("alicePassword"),
Roles: []string{"aliceRole1", "aliceRole2"},
Options: &authpb.UserAddOptions{
NoPassword: true,
},
},
},
},
{
name: "Skips deleted user",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
})
tx.UnsafePutUser(&authpb.User{
Name: []byte("bob"),
})
tx.UnsafeDeleteUser("alice")
},
want: []*authpb.User{{Name: []byte("bob")}},
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("oldPassword"),
})
tx.UnsafePutUser(&authpb.User{
Name: []byte("bob"),
})
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("newPassword"),
})
},
want: []*authpb.User{
{Name: []byte("alice"), Password: []byte("newPassword")},
{Name: []byte("bob")},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
abe := NewAuthBackend(zaptest.NewLogger(t), be)
abe.CreateAuthBuckets()
tx := abe.BatchTx()
tx.Lock()
tc.setup(tx)
tx.Unlock()
abe.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
users := abe2.GetAllUsers()
assert.Equal(t, tc.want, users)
})
}
}
func TestGetUser(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
want *authpb.User
}{
{
name: "Returns nil for missing user",
setup: func(tx auth.AuthBatchTx) {},
want: nil,
},
{
name: "Returns data put before",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("alicePassword"),
Roles: []string{"aliceRole1", "aliceRole2"},
Options: &authpb.UserAddOptions{
NoPassword: true,
},
})
},
want: &authpb.User{
Name: []byte("alice"),
Password: []byte("alicePassword"),
Roles: []string{"aliceRole1", "aliceRole2"},
Options: &authpb.UserAddOptions{
NoPassword: true,
},
},
},
{
name: "Skips deleted",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
})
tx.UnsafeDeleteUser("alice")
},
want: nil,
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("oldPassword"),
})
tx.UnsafePutUser(&authpb.User{
Name: []byte("alice"),
Password: []byte("newPassword"),
})
},
want: &authpb.User{
Name: []byte("alice"),
Password: []byte("newPassword"),
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
abe := NewAuthBackend(zaptest.NewLogger(t), be)
abe.CreateAuthBuckets()
tx := abe.BatchTx()
tx.Lock()
tc.setup(tx)
tx.Unlock()
abe.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(zaptest.NewLogger(t), be2)
users := abe2.GetUser("alice")
assert.Equal(t, tc.want, users)
})
}
}

View File

@ -31,19 +31,19 @@ const (
MemberRaftAttributesSuffix = "raftAttributes"
)
type membershipStore struct {
type membershipBackend struct {
lg *zap.Logger
be backend.Backend
}
func NewMembershipStore(lg *zap.Logger, be backend.Backend) *membershipStore {
return &membershipStore{
func NewMembershipBackend(lg *zap.Logger, be backend.Backend) *membershipBackend {
return &membershipBackend{
lg: lg,
be: be,
}
}
func (s *membershipStore) MustSaveMemberToBackend(m *membership.Member) {
func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
mkey := BackendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
@ -58,7 +58,7 @@ func (s *membershipStore) MustSaveMemberToBackend(m *membership.Member) {
// TrimClusterFromBackend removes all information about cluster (versions)
// from the v3 backend.
func (s *membershipStore) TrimClusterFromBackend() error {
func (s *membershipBackend) TrimClusterFromBackend() error {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
@ -66,7 +66,7 @@ func (s *membershipStore) TrimClusterFromBackend() error {
return nil
}
func (s *membershipStore) MustDeleteMemberFromBackend(id types.ID) {
func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) {
mkey := BackendMemberKey(id)
tx := s.be.BatchTx()
@ -76,7 +76,7 @@ func (s *membershipStore) MustDeleteMemberFromBackend(id types.ID) {
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
}
func (s *membershipStore) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) {
func (s *membershipBackend) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) {
members, removed, err := s.readMembersFromBackend()
if err != nil {
s.lg.Panic("couldn't read members from backend", zap.Error(err))
@ -84,7 +84,7 @@ func (s *membershipStore) MustReadMembersFromBackend() (map[types.ID]*membership
return members, removed
}
func (s *membershipStore) readMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool, error) {
func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool, error) {
members := make(map[types.ID]*membership.Member)
removed := make(map[types.ID]bool)
@ -117,7 +117,7 @@ func (s *membershipStore) readMembersFromBackend() (map[types.ID]*membership.Mem
// TrimMembershipFromBackend removes all information about members &
// removed_members from the v3 backend.
func (s *membershipStore) TrimMembershipFromBackend() error {
func (s *membershipBackend) TrimMembershipFromBackend() error {
s.lg.Info("Trimming membership information from the backend...")
tx := s.be.BatchTx()
tx.Lock()
@ -141,7 +141,7 @@ func (s *membershipStore) TrimMembershipFromBackend() error {
// MustSaveClusterVersionToBackend saves cluster version to backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) MustSaveClusterVersionToBackend(ver *semver.Version) {
func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version) {
ckey := ClusterClusterVersionKeyName
tx := s.be.BatchTx()
@ -152,7 +152,7 @@ func (s *membershipStore) MustSaveClusterVersionToBackend(ver *semver.Version) {
// MustSaveDowngradeToBackend saves downgrade info to backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) {
func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) {
dkey := ClusterDowngradeKeyName
dvalue, err := json.Marshal(downgrade)
if err != nil {
@ -164,7 +164,7 @@ func (s *membershipStore) MustSaveDowngradeToBackend(downgrade *membership.Downg
tx.UnsafePut(Cluster, dkey, dvalue)
}
func (s *membershipStore) MustCreateBackendBuckets() {
func (s *membershipBackend) MustCreateBackendBuckets() {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
@ -183,7 +183,7 @@ func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
// ClusterVersionFromBackend reads cluster version from backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) ClusterVersionFromBackend() *semver.Version {
func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version {
ckey := ClusterClusterVersionKeyName
tx := s.be.ReadTx()
tx.RLock()
@ -203,7 +203,7 @@ func (s *membershipStore) ClusterVersionFromBackend() *semver.Version {
// DowngradeInfoFromBackend reads downgrade info from backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) DowngradeInfoFromBackend() *membership.DowngradeInfo {
func (s *membershipBackend) DowngradeInfoFromBackend() *membership.DowngradeInfo {
dkey := ClusterDowngradeKeyName
tx := s.be.ReadTx()
tx.Lock()

View File

@ -55,11 +55,11 @@ func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, e
}
confstate := UnsafeConfStateFromBackend(lg, tx)
if confstate == nil {
return nil, fmt.Errorf("missing %q key", MetaConfStateName)
return nil, fmt.Errorf("missing confstate information")
}
_, term := UnsafeReadConsistentIndex(tx)
if term == 0 {
return nil, fmt.Errorf("missing %q key", MetaTermKeyName)
return nil, fmt.Errorf("missing term information")
}
copied := V3_5
return &copied, nil

View File

@ -31,48 +31,63 @@ func TestUpdateStorageVersion(t *testing.T) {
tcs := []struct {
name string
version string
metaKeys [][]byte
setupKeys func(tx backend.BatchTx)
expectVersion *semver.Version
expectError bool
expectedErrorMsg string
}{
{
name: `Backend before 3.6 without "confState" should be rejected`,
name: `Backend before 3.6 without confstate should be rejected`,
version: "",
expectVersion: nil,
setupKeys: func(tx backend.BatchTx) {},
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "confState" key`,
expectedErrorMsg: `cannot determine storage version: missing confstate information`,
},
{
name: `Backend before 3.6 without "term" should be rejected`,
version: "",
metaKeys: [][]byte{MetaConfStateName},
name: `Backend before 3.6 without term should be rejected`,
version: "",
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
},
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
expectedErrorMsg: `cannot determine storage version: missing term information`,
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
name: "Backend with current version should be skipped",
version: version.Version,
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
setupKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
},
expectVersion: &semver.Version{Major: 3, Minor: 7},
},
}
@ -86,16 +101,7 @@ func TestUpdateStorageVersion(t *testing.T) {
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
switch string(k) {
case string(MetaConfStateName):
MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(MetaTermKeyName):
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
default:
tx.UnsafePut(Meta, k, []byte{})
}
}
tc.setupKeys(tx)
if tc.version != "" {
UnsafeSetStorageVersion(tx, semver.New(tc.version))
}