server/auth: protect rangePermCache with a RW lock

Signed-off-by: Hitoshi Mitake <h.mitake@gmail.com>
This commit is contained in:
Hitoshi Mitake
2022-04-17 23:06:30 +09:00
parent 71bba3c761
commit de09174a3f
6 changed files with 153 additions and 51 deletions

View File

@@ -111,6 +111,9 @@ func (atx *authBatchTx) Lock() {
func (atx *authBatchTx) Unlock() {
atx.tx.Unlock()
// Calling Commit() for defensive purpose. If the number of pending writes doesn't exceed batchLimit,
// ReadTx can miss some writes issued by its predecessor BatchTx.
atx.tx.Commit()
}
func (atx *authReadTx) UnsafeReadAuthEnabled() bool {

View File

@@ -31,13 +31,6 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
return arx.UnsafeGetUser(username)
}
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetAllUsers()
@@ -73,8 +66,23 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
return user
}
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1)
var vs [][]byte
err := atx.tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
vs = append(vs, v)
return nil
})
if err != nil {
atx.lg.Panic("failed to get users",
zap.Error(err))
}
if len(vs) == 0 {
return nil
}

View File

@@ -113,7 +113,7 @@ func TestGetAllUsers(t *testing.T) {
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetAllUsers()
users := abe2.ReadTx().UnsafeGetAllUsers()
assert.Equal(t, tc.want, users)
})