Remove RLock/RUnlock from BatchTx

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz
2023-07-27 13:32:58 +02:00
parent b4f8a7be51
commit 29769984e6
17 changed files with 115 additions and 87 deletions

View File

@@ -79,7 +79,7 @@ func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) {
return s.unsafeGetAllAlarms(tx)
}
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.UnsafeReader) ([]*etcdserverpb.AlarmMember, error) {
var ms []*etcdserverpb.AlarmMember
err := tx.UnsafeForEach(Alarm, func(k, v []byte) error {
var m etcdserverpb.AlarmMember

View File

@@ -96,13 +96,11 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) {
}
func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeReadAuthEnabled()
return unsafeReadAuthEnabled(atx.tx)
}
func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeReadAuthRevision()
return unsafeReadAuthRevision(atx.tx)
}
func (atx *authBatchTx) Lock() {
@@ -117,7 +115,11 @@ func (atx *authBatchTx) Unlock() {
}
func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
_, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
return unsafeReadAuthEnabled(atx.tx)
}
func unsafeReadAuthEnabled(tx backend.UnsafeReader) bool {
_, vs := tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
return true
@@ -127,7 +129,11 @@ func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
}
func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
_, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
return unsafeReadAuthRevision(atx.tx)
}
func unsafeReadAuthRevision(tx backend.UnsafeReader) uint64 {
_, vs := tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0
@@ -135,10 +141,10 @@ func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
return binary.BigEndian.Uint64(vs[0])
}
func (atx *authReadTx) Lock() {
func (atx *authReadTx) RLock() {
atx.tx.RLock()
}
func (atx *authReadTx) Unlock() {
func (atx *authReadTx) RUnlock() {
atx.tx.RUnlock()
}

View File

@@ -26,15 +26,14 @@ func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) {
}
func (abe *authBackend) GetRole(roleName string) *authpb.Role {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := abe.ReadTx()
tx.RLock()
defer tx.RUnlock()
return tx.UnsafeGetRole(roleName)
}
func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetRole(roleName)
return unsafeGetRole(atx.lg, atx.tx, roleName)
}
func (abe *authBackend) GetAllRoles() []*authpb.Role {
@@ -45,8 +44,7 @@ func (abe *authBackend) GetAllRoles() []*authpb.Role {
}
func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetAllRoles()
return unsafeGetAllRoles(atx.lg, atx.tx)
}
func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) {
@@ -67,7 +65,11 @@ func (atx *authBatchTx) UnsafeDeleteRole(rolename string) {
}
func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role {
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
return unsafeGetRole(atx.lg, atx.tx, roleName)
}
func unsafeGetRole(lg *zap.Logger, tx backend.UnsafeReader, roleName string) *authpb.Role {
_, vs := tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
if len(vs) == 0 {
return nil
}
@@ -75,13 +77,17 @@ func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role {
role := &authpb.Role{}
err := role.Unmarshal(vs[0])
if err != nil {
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
}
return role
}
func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role {
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
return unsafeGetAllRoles(atx.lg, atx.tx)
}
func unsafeGetAllRoles(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.Role {
_, vs := tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
@@ -91,7 +97,7 @@ func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role {
role := &authpb.Role{}
err := role.Unmarshal(vs[i])
if err != nil {
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
}
roles[i] = role
}

View File

@@ -17,24 +17,24 @@ package schema
import (
"go.uber.org/zap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/api/v3/authpb"
)
func (abe *authBackend) GetUser(username string) *authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := abe.ReadTx()
tx.RLock()
defer tx.RUnlock()
return tx.UnsafeGetUser(username)
}
func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetUser(username)
return unsafeGetUser(atx.lg, atx.tx, username)
}
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeGetAllUsers()
return unsafeGetAllUsers(atx.lg, atx.tx)
}
func (atx *authBatchTx) UnsafePutUser(user *authpb.User) {
@@ -50,7 +50,11 @@ func (atx *authBatchTx) UnsafeDeleteUser(username string) {
}
func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
return unsafeGetUser(atx.lg, atx.tx, username)
}
func unsafeGetUser(lg *zap.Logger, tx backend.UnsafeReader, username string) *authpb.User {
_, vs := tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
}
@@ -58,7 +62,7 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
user := &authpb.User{}
err := user.Unmarshal(vs[0])
if err != nil {
atx.lg.Panic(
lg.Panic(
"failed to unmarshal 'authpb.User'",
zap.String("user-name", username),
zap.Error(err),
@@ -68,20 +72,24 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
}
func (abe *authBackend) GetAllUsers() []*authpb.User {
tx := abe.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := abe.ReadTx()
tx.RLock()
defer tx.RUnlock()
return tx.UnsafeGetAllUsers()
}
func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
return unsafeGetAllUsers(atx.lg, atx.tx)
}
func unsafeGetAllUsers(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.User {
var vs [][]byte
err := atx.tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
err := tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
vs = append(vs, v)
return nil
})
if err != nil {
atx.lg.Panic("failed to get users",
lg.Panic("failed to get users",
zap.Error(err))
}
if len(vs) == 0 {
@@ -93,7 +101,7 @@ func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
user := &authpb.User{}
err := user.Unmarshal(vs[i])
if err != nil {
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
}
users[i] = user
}

View File

@@ -37,7 +37,7 @@ func CreateMetaBucket(tx backend.BatchTx) {
// UnsafeReadConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
func UnsafeReadConsistentIndex(tx backend.UnsafeReader) (uint64, uint64) {
_, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0

View File

@@ -37,7 +37,7 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.UnsafeReader) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(Meta, MetaConfStateName, nil, 0)
if len(keys) == 0 {
return nil

View File

@@ -26,7 +26,7 @@ func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Lease)
}
func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
func MustUnsafeGetAllLeases(tx backend.UnsafeReader) []*leasepb.Lease {
ls := make([]*leasepb.Lease, 0)
err := tx.UnsafeForEach(Lease, func(k, v []byte) error {
var lpb leasepb.Lease

View File

@@ -32,7 +32,7 @@ func Validate(lg *zap.Logger, tx backend.ReadTx) error {
return unsafeValidate(lg, tx)
}
func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error {
func unsafeValidate(lg *zap.Logger, tx backend.UnsafeReader) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
// v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version.
@@ -91,7 +91,7 @@ func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, e
}
// UnsafeDetectSchemaVersion non-threadsafe version of DetectSchemaVersion.
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.UnsafeReader) (v semver.Version, err error) {
vp := UnsafeReadStorageVersion(tx)
if vp != nil {
return *vp, nil

View File

@@ -31,7 +31,7 @@ func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
// UnsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func UnsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
func UnsafeReadStorageVersion(tx backend.UnsafeReader) *semver.Version {
_, vs := tx.UnsafeRange(Meta, MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil