Merge pull request #13954 from mitake/perm-cache-lock

server/auth: protect rangePermCache with a mutex
This commit is contained in:
Benjamin Wang 2022-07-03 07:01:46 +08:00 committed by GitHub
commit 33347b6845
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 153 additions and 51 deletions

View File

@ -103,34 +103,49 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}
func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
as.rangePermCacheMu.RLock()
defer as.rangePermCacheMu.RUnlock()
rangePerm, ok := as.rangePermCache[userName]
if !ok {
as.lg.Error(
"user doesn't exist",
zap.String("user-name", userName),
)
return false
}
if len(rangeEnd) == 0 {
return checkKeyPoint(as.lg, rangePerm, key, permtyp)
}
return checkKeyInterval(as.lg, rangePerm, key, rangeEnd, permtyp)
}
func (as *authStore) refreshRangePermCache(tx AuthReadTx) {
// Note that every authentication configuration update calls this method and it invalidates the entire
// rangePermCache and reconstruct it based on information of users and roles stored in the backend.
// This can be a costly operation.
as.rangePermCacheMu.Lock()
defer as.rangePermCacheMu.Unlock()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
users := tx.UnsafeGetAllUsers()
for _, user := range users {
userName := string(user.Name)
perms := getMergedPerms(tx, userName)
if perms == nil {
as.lg.Error(
"failed to create a merged permission",
zap.String("user-name", userName),
)
return false
continue
}
as.rangePermCache[userName] = perms
}
if len(rangeEnd) == 0 {
return checkKeyPoint(as.lg, as.rangePermCache[userName], key, permtyp)
}
return checkKeyInterval(as.lg, as.rangePermCache[userName], key, rangeEnd, permtyp)
}
func (as *authStore) clearCachedPerm() {
as.rangePermCache = make(map[string]*unifiedRangePermissions)
}
func (as *authStore) invalidateCachedPerm(userName string) {
delete(as.rangePermCache, userName)
}
type unifiedRangePermissions struct {

View File

@ -235,7 +235,14 @@ type authStore struct {
enabled bool
enabledMu sync.RWMutex
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
// rangePermCache needs to be protected by rangePermCacheMu
// rangePermCacheMu needs to be write locked only in initialization phase or configuration changes
// Hot paths like Range(), needs to acquire read lock for improving performance
//
// Note that BatchTx and ReadTx cannot be a mutex for rangePermCache because they are independent resources
// see also: https://github.com/etcd-io/etcd/pull/13920#discussion_r849114855
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
rangePermCacheMu sync.RWMutex
tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
@ -268,7 +275,7 @@ func (as *authStore) AuthEnable() error {
as.enabled = true
as.tokenProvider.enable()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
as.refreshRangePermCache(tx)
as.setRevision(tx.UnsafeReadAuthRevision())
@ -437,6 +444,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
tx.UnsafePutUser(newUser)
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info("added a user", zap.String("user-name", r.Name))
return &pb.AuthUserAddResponse{}, nil
@ -459,8 +467,8 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
tx.UnsafeDeleteUser(r.Name)
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
as.lg.Info(
@ -500,8 +508,8 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
tx.UnsafePutUser(updatedUser)
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
as.lg.Info(
@ -545,9 +553,8 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
tx.UnsafePutUser(user)
as.invalidateCachedPerm(r.User)
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info(
"granted a role to a user",
@ -617,9 +624,8 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
tx.UnsafePutUser(updatedUser)
as.invalidateCachedPerm(r.Name)
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info(
"revoked a role from a user",
@ -682,11 +688,8 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
tx.UnsafePutRole(updatedRole)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
as.clearCachedPerm()
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info(
"revoked a permission on range",
@ -734,10 +737,10 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
tx.UnsafePutUser(updatedUser)
as.invalidateCachedPerm(string(user.Name))
}
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
return &pb.AuthRoleDeleteResponse{}, nil
@ -822,11 +825,8 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
tx.UnsafePutRole(role)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
as.clearCachedPerm()
as.commitRevision(tx)
as.refreshRangePermCache(tx)
as.lg.Info(
"granted/updated a permission to a user",
@ -871,7 +871,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return nil
}
if as.isRangeOpPermitted(tx, userName, key, rangeEnd, permTyp) {
if as.isRangeOpPermitted(userName, key, rangeEnd, permTyp) {
return nil
}

View File

@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/v3/adt"
"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc/metadata"
)
@ -141,7 +142,8 @@ func TestUserAdd(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)
ua := &pb.AuthUserAddRequest{Name: "foo", Options: &authpb.UserAddOptions{NoPassword: false}}
const userName = "foo"
ua := &pb.AuthUserAddRequest{Name: userName, Options: &authpb.UserAddOptions{NoPassword: false}}
_, err := as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
@ -155,6 +157,11 @@ func TestUserAdd(t *testing.T) {
if err != ErrUserEmpty {
t.Fatal(err)
}
if _, ok := as.rangePermCache[userName]; !ok {
t.Fatalf("user %s should be added but it doesn't exist in rangePermCache", userName)
}
}
func TestRecover(t *testing.T) {
@ -204,7 +211,8 @@ func TestUserDelete(t *testing.T) {
defer tearDown(t)
// delete an existing user
ud := &pb.AuthUserDeleteRequest{Name: "foo"}
const userName = "foo"
ud := &pb.AuthUserDeleteRequest{Name: userName}
_, err := as.UserDelete(ud)
if err != nil {
t.Fatal(err)
@ -218,6 +226,47 @@ func TestUserDelete(t *testing.T) {
if err != ErrUserNotFound {
t.Fatalf("expected %v, got %v", ErrUserNotFound, err)
}
if _, ok := as.rangePermCache[userName]; ok {
t.Fatalf("user %s should be deleted but it exists in rangePermCache", userName)
}
}
func TestUserDeleteAndPermCache(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)
// delete an existing user
const deletedUserName = "foo"
ud := &pb.AuthUserDeleteRequest{Name: deletedUserName}
_, err := as.UserDelete(ud)
if err != nil {
t.Fatal(err)
}
// delete a non-existing user
_, err = as.UserDelete(ud)
if err != ErrUserNotFound {
t.Fatalf("expected %v, got %v", ErrUserNotFound, err)
}
if _, ok := as.rangePermCache[deletedUserName]; ok {
t.Fatalf("user %s should be deleted but it exists in rangePermCache", deletedUserName)
}
// add a new user
const newUser = "bar"
ua := &pb.AuthUserAddRequest{Name: newUser, HashedPassword: encodePassword("pwd1"), Options: &authpb.UserAddOptions{NoPassword: false}}
_, err = as.UserAdd(ua)
if err != nil {
t.Fatal(err)
}
if _, ok := as.rangePermCache[newUser]; !ok {
t.Fatalf("user %s should exist but it doesn't exist in rangePermCache", deletedUserName)
}
}
func TestUserChangePassword(t *testing.T) {
@ -540,17 +589,44 @@ func TestUserRevokePermission(t *testing.T) {
t.Fatal(err)
}
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "foo", Role: "role-test"})
const userName = "foo"
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: userName, Role: "role-test"})
if err != nil {
t.Fatal(err)
}
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "foo", Role: "role-test-1"})
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: userName, Role: "role-test-1"})
if err != nil {
t.Fatal(err)
}
u, err := as.UserGet(&pb.AuthUserGetRequest{Name: "foo"})
perm := &authpb.Permission{
PermType: authpb.WRITE,
Key: []byte("WriteKeyBegin"),
RangeEnd: []byte("WriteKeyEnd"),
}
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "role-test-1",
Perm: perm,
})
if err != nil {
t.Fatal(err)
}
if _, ok := as.rangePermCache[userName]; !ok {
t.Fatalf("User %s should have its entry in rangePermCache", userName)
}
unifiedPerm := as.rangePermCache[userName]
pt1 := adt.NewBytesAffinePoint([]byte("WriteKeyBegin"))
if !unifiedPerm.writePerms.Contains(pt1) {
t.Fatal("rangePermCache should contain WriteKeyBegin")
}
pt2 := adt.NewBytesAffinePoint([]byte("OutOfRange"))
if unifiedPerm.writePerms.Contains(pt2) {
t.Fatal("rangePermCache should not contain OutOfRange")
}
u, err := as.UserGet(&pb.AuthUserGetRequest{Name: userName})
if err != nil {
t.Fatal(err)
}
@ -559,12 +635,12 @@ func TestUserRevokePermission(t *testing.T) {
assert.Equal(t, expected, u.Roles)
_, err = as.UserRevokeRole(&pb.AuthUserRevokeRoleRequest{Name: "foo", Role: "role-test-1"})
_, err = as.UserRevokeRole(&pb.AuthUserRevokeRoleRequest{Name: userName, Role: "role-test-1"})
if err != nil {
t.Fatal(err)
}
u, err = as.UserGet(&pb.AuthUserGetRequest{Name: "foo"})
u, err = as.UserGet(&pb.AuthUserGetRequest{Name: userName})
if err != nil {
t.Fatal(err)
}

View File

@ -111,6 +111,9 @@ func (atx *authBatchTx) Lock() {
func (atx *authBatchTx) Unlock() {
atx.tx.Unlock()
// Calling Commit() for defensive purpose. If the number of pending writes doesn't exceed batchLimit,
// ReadTx can miss some writes issued by its predecessor BatchTx.
atx.tx.Commit()
}
func (atx *authReadTx) UnsafeReadAuthEnabled() bool {

View File

@ -31,13 +31,6 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
return arx.UnsafeGetUser(username)
}
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetAllUsers()
@ -73,8 +66,23 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
return user
}
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1)
var vs [][]byte
err := atx.tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
vs = append(vs, v)
return nil
})
if err != nil {
atx.lg.Panic("failed to get users",
zap.Error(err))
}
if len(vs) == 0 {
return nil
}

View File

@ -113,7 +113,7 @@ func TestGetAllUsers(t *testing.T) {
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetAllUsers()
users := abe2.ReadTx().UnsafeGetAllUsers()
assert.Equal(t, tc.want, users)
})