mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16320 from serathius/rlock2
Remove RLock/RUnlock from BatchTx
This commit is contained in:
commit
3b708df1e0
@ -21,7 +21,7 @@ import (
|
||||
"go.etcd.io/etcd/pkg/v3/adt"
|
||||
)
|
||||
|
||||
func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions {
|
||||
func getMergedPerms(tx UnsafeAuthReader, userName string) *unifiedRangePermissions {
|
||||
user := tx.UnsafeGetUser(userName)
|
||||
if user == nil {
|
||||
return nil
|
||||
@ -127,7 +127,7 @@ func (as *authStore) isRangeOpPermitted(userName string, key, rangeEnd []byte, p
|
||||
return checkKeyInterval(as.lg, rangePerm, key, rangeEnd, permtyp)
|
||||
}
|
||||
|
||||
func (as *authStore) refreshRangePermCache(tx AuthReadTx) {
|
||||
func (as *authStore) refreshRangePermCache(tx UnsafeAuthReader) {
|
||||
// Note that every authentication configuration update calls this method and it invalidates the entire
|
||||
// rangePermCache and reconstruct it based on information of users and roles stored in the backend.
|
||||
// This can be a costly operation.
|
||||
|
@ -206,7 +206,9 @@ type AuthBackend interface {
|
||||
}
|
||||
|
||||
type AuthBatchTx interface {
|
||||
AuthReadTx
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeAuthReader
|
||||
UnsafeSaveAuthEnabled(enabled bool)
|
||||
UnsafeSaveAuthRevision(rev uint64)
|
||||
UnsafePutUser(*authpb.User)
|
||||
@ -216,14 +218,18 @@ type AuthBatchTx interface {
|
||||
}
|
||||
|
||||
type AuthReadTx interface {
|
||||
RLock()
|
||||
RUnlock()
|
||||
UnsafeAuthReader
|
||||
}
|
||||
|
||||
type UnsafeAuthReader interface {
|
||||
UnsafeReadAuthEnabled() bool
|
||||
UnsafeReadAuthRevision() uint64
|
||||
UnsafeGetUser(string) *authpb.User
|
||||
UnsafeGetRole(string) *authpb.Role
|
||||
UnsafeGetAllUsers() []*authpb.User
|
||||
UnsafeGetAllRoles() []*authpb.Role
|
||||
Lock()
|
||||
Unlock()
|
||||
}
|
||||
|
||||
type authStore struct {
|
||||
@ -354,8 +360,8 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
// to avoid putting it in the critical section of the tx lock.
|
||||
revision, err := func() (uint64, error) {
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
|
||||
user = tx.UnsafeGetUser(username)
|
||||
if user == nil {
|
||||
@ -382,13 +388,13 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
func (as *authStore) Recover(be AuthBackend) {
|
||||
as.be = be
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
tx.RLock()
|
||||
|
||||
enabled := tx.UnsafeReadAuthEnabled()
|
||||
as.setRevision(tx.UnsafeReadAuthRevision())
|
||||
as.refreshRangePermCache(tx)
|
||||
|
||||
tx.Unlock()
|
||||
tx.RUnlock()
|
||||
|
||||
as.enabledMu.Lock()
|
||||
as.enabled = enabled
|
||||
@ -864,8 +870,8 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
|
||||
}
|
||||
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
|
||||
user := tx.UnsafeGetUser(userName)
|
||||
if user == nil {
|
||||
@ -906,8 +912,8 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
}
|
||||
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
u := tx.UnsafeGetUser(authInfo.Username)
|
||||
|
||||
if u == nil {
|
||||
|
@ -106,6 +106,12 @@ func (t txMock) Lock() {
|
||||
func (t txMock) Unlock() {
|
||||
}
|
||||
|
||||
func (t txMock) RLock() {
|
||||
}
|
||||
|
||||
func (t txMock) RUnlock() {
|
||||
}
|
||||
|
||||
func (t txMock) UnsafeSaveAuthEnabled(enabled bool) {
|
||||
t.be.enabled = enabled
|
||||
}
|
||||
|
@ -44,9 +44,9 @@ type Bucket interface {
|
||||
}
|
||||
|
||||
type BatchTx interface {
|
||||
ReadTx
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeReader
|
||||
UnsafeCreateBucket(bucket Bucket)
|
||||
UnsafeDeleteBucket(bucket Bucket)
|
||||
UnsafePut(bucket Bucket, key []byte, value []byte)
|
||||
@ -103,18 +103,6 @@ func (t *batchTx) Unlock() {
|
||||
t.Mutex.Unlock()
|
||||
}
|
||||
|
||||
// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
|
||||
// have appropriate semantics in BatchTx interface. Therefore should not be called.
|
||||
// TODO: might want to decouple ReadTx and BatchTx
|
||||
|
||||
func (t *batchTx) RLock() {
|
||||
panic("unexpected RLock")
|
||||
}
|
||||
|
||||
func (t *batchTx) RUnlock() {
|
||||
panic("unexpected RUnlock")
|
||||
}
|
||||
|
||||
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
|
||||
_, err := t.tx.CreateBucket(bucket.Name())
|
||||
if err != nil && err != bolt.ErrBucketExists {
|
||||
|
@ -28,7 +28,10 @@ import (
|
||||
type ReadTx interface {
|
||||
RLock()
|
||||
RUnlock()
|
||||
UnsafeReader
|
||||
}
|
||||
|
||||
type UnsafeReader interface {
|
||||
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
||||
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ const (
|
||||
hashStorageMaxSize = 10
|
||||
)
|
||||
|
||||
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
||||
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
||||
h := newKVHasher(compactRevision, revision, keep)
|
||||
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
|
||||
h.WriteKeyValue(k, v)
|
||||
|
@ -28,8 +28,13 @@ import (
|
||||
)
|
||||
|
||||
type storeTxnRead struct {
|
||||
s *store
|
||||
storeTxnCommon
|
||||
tx backend.ReadTx
|
||||
}
|
||||
|
||||
type storeTxnCommon struct {
|
||||
s *store
|
||||
tx backend.UnsafeReader
|
||||
|
||||
firstRev int64
|
||||
rev int64
|
||||
@ -54,17 +59,17 @@ func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
|
||||
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
||||
firstRev, rev := s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
|
||||
return newMetricsTxnRead(&storeTxnRead{storeTxnCommon{s, tx, firstRev, rev, trace}, tx})
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
||||
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
|
||||
func (tr *storeTxnCommon) FirstRev() int64 { return tr.firstRev }
|
||||
func (tr *storeTxnCommon) Rev() int64 { return tr.rev }
|
||||
|
||||
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
func (tr *storeTxnCommon) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||
func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||
rev := ro.Rev
|
||||
if rev > curRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||
@ -132,7 +137,7 @@ func (tr *storeTxnRead) End() {
|
||||
}
|
||||
|
||||
type storeTxnWrite struct {
|
||||
storeTxnRead
|
||||
storeTxnCommon
|
||||
tx backend.BatchTx
|
||||
// beginRev is the revision where the txn begins; it will write to the next revision.
|
||||
beginRev int64
|
||||
@ -144,10 +149,10 @@ func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
||||
tx := s.b.BatchTx()
|
||||
tx.LockInsideApply()
|
||||
tw := &storeTxnWrite{
|
||||
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
|
||||
tx: tx,
|
||||
beginRev: s.currentRev,
|
||||
changes: make([]mvccpb.KeyValue, 0, 4),
|
||||
storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
|
||||
tx: tx,
|
||||
beginRev: s.currentRev,
|
||||
changes: make([]mvccpb.KeyValue, 0, 4),
|
||||
}
|
||||
return newMetricsTxnWrite(tw)
|
||||
}
|
||||
@ -217,7 +222,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
tw.storeTxnRead.s.lg.Fatal(
|
||||
tw.storeTxnCommon.s.lg.Fatal(
|
||||
"failed to marshal mvccpb.KeyValue",
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -240,7 +245,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
}
|
||||
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
tw.storeTxnRead.s.lg.Error(
|
||||
tw.storeTxnCommon.s.lg.Error(
|
||||
"failed to detach old lease from a key",
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -278,13 +283,13 @@ func (tw *storeTxnWrite) delete(key []byte) {
|
||||
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
|
||||
revToBytes(idxRev, ibytes)
|
||||
|
||||
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
|
||||
ibytes = appendMarkTombstone(tw.storeTxnCommon.s.lg, ibytes)
|
||||
|
||||
kv := mvccpb.KeyValue{Key: key}
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
tw.storeTxnRead.s.lg.Fatal(
|
||||
tw.storeTxnCommon.s.lg.Fatal(
|
||||
"failed to marshal mvccpb.KeyValue",
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -293,7 +298,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
|
||||
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
|
||||
err = tw.s.kvindex.Tombstone(key, idxRev)
|
||||
if err != nil {
|
||||
tw.storeTxnRead.s.lg.Fatal(
|
||||
tw.storeTxnCommon.s.lg.Fatal(
|
||||
"failed to tombstone an existing key",
|
||||
zap.String("key", string(key)),
|
||||
zap.Error(err),
|
||||
@ -307,7 +312,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
|
||||
if leaseID != lease.NoLease {
|
||||
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
|
||||
if err != nil {
|
||||
tw.storeTxnRead.s.lg.Error(
|
||||
tw.storeTxnCommon.s.lg.Error(
|
||||
"failed to detach old lease from a key",
|
||||
zap.Error(err),
|
||||
)
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
)
|
||||
|
||||
func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
|
||||
func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) {
|
||||
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
return bytesToRev(finishedCompactBytes[0]).main, true
|
||||
@ -27,7 +27,7 @@ func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found b
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) {
|
||||
func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) {
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
return bytesToRev(scheduledCompactBytes[0]).main, true
|
||||
|
@ -79,7 +79,7 @@ func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) {
|
||||
return s.unsafeGetAllAlarms(tx)
|
||||
}
|
||||
|
||||
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
|
||||
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.UnsafeReader) ([]*etcdserverpb.AlarmMember, error) {
|
||||
var ms []*etcdserverpb.AlarmMember
|
||||
err := tx.UnsafeForEach(Alarm, func(k, v []byte) error {
|
||||
var m etcdserverpb.AlarmMember
|
||||
|
@ -96,13 +96,11 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeReadAuthEnabled()
|
||||
return unsafeReadAuthEnabled(atx.tx)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeReadAuthRevision()
|
||||
return unsafeReadAuthRevision(atx.tx)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) Lock() {
|
||||
@ -117,7 +115,11 @@ func (atx *authBatchTx) Unlock() {
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
|
||||
_, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
|
||||
return unsafeReadAuthEnabled(atx.tx)
|
||||
}
|
||||
|
||||
func unsafeReadAuthEnabled(tx backend.UnsafeReader) bool {
|
||||
_, vs := tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
if bytes.Equal(vs[0], authEnabled) {
|
||||
return true
|
||||
@ -127,7 +129,11 @@ func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
|
||||
_, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
|
||||
return unsafeReadAuthRevision(atx.tx)
|
||||
}
|
||||
|
||||
func unsafeReadAuthRevision(tx backend.UnsafeReader) uint64 {
|
||||
_, vs := tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
// this can happen in the initialization phase
|
||||
return 0
|
||||
@ -135,10 +141,10 @@ func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
|
||||
return binary.BigEndian.Uint64(vs[0])
|
||||
}
|
||||
|
||||
func (atx *authReadTx) Lock() {
|
||||
func (atx *authReadTx) RLock() {
|
||||
atx.tx.RLock()
|
||||
}
|
||||
|
||||
func (atx *authReadTx) Unlock() {
|
||||
func (atx *authReadTx) RUnlock() {
|
||||
atx.tx.RUnlock()
|
||||
}
|
||||
|
@ -26,15 +26,14 @@ func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) {
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetRole(roleName string) *authpb.Role {
|
||||
tx := abe.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx := abe.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return tx.UnsafeGetRole(roleName)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetRole(roleName)
|
||||
return unsafeGetRole(atx.lg, atx.tx, roleName)
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetAllRoles() []*authpb.Role {
|
||||
@ -45,8 +44,7 @@ func (abe *authBackend) GetAllRoles() []*authpb.Role {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetAllRoles()
|
||||
return unsafeGetAllRoles(atx.lg, atx.tx)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) {
|
||||
@ -67,7 +65,11 @@ func (atx *authBatchTx) UnsafeDeleteRole(rolename string) {
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
|
||||
return unsafeGetRole(atx.lg, atx.tx, roleName)
|
||||
}
|
||||
|
||||
func unsafeGetRole(lg *zap.Logger, tx backend.UnsafeReader, roleName string) *authpb.Role {
|
||||
_, vs := tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -75,13 +77,17 @@ func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role {
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[0])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
return role
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
return unsafeGetAllRoles(atx.lg, atx.tx)
|
||||
}
|
||||
|
||||
func unsafeGetAllRoles(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.Role {
|
||||
_, vs := tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -91,7 +97,7 @@ func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role {
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[i])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
roles[i] = role
|
||||
}
|
||||
|
@ -17,24 +17,24 @@ package schema
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/authpb"
|
||||
)
|
||||
|
||||
func (abe *authBackend) GetUser(username string) *authpb.User {
|
||||
tx := abe.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx := abe.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return tx.UnsafeGetUser(username)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetUser(username)
|
||||
return unsafeGetUser(atx.lg, atx.tx, username)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetAllUsers()
|
||||
return unsafeGetAllUsers(atx.lg, atx.tx)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafePutUser(user *authpb.User) {
|
||||
@ -50,7 +50,11 @@ func (atx *authBatchTx) UnsafeDeleteUser(username string) {
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
|
||||
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
|
||||
return unsafeGetUser(atx.lg, atx.tx, username)
|
||||
}
|
||||
|
||||
func unsafeGetUser(lg *zap.Logger, tx backend.UnsafeReader, username string) *authpb.User {
|
||||
_, vs := tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -58,7 +62,7 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
|
||||
user := &authpb.User{}
|
||||
err := user.Unmarshal(vs[0])
|
||||
if err != nil {
|
||||
atx.lg.Panic(
|
||||
lg.Panic(
|
||||
"failed to unmarshal 'authpb.User'",
|
||||
zap.String("user-name", username),
|
||||
zap.Error(err),
|
||||
@ -68,20 +72,24 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetAllUsers() []*authpb.User {
|
||||
tx := abe.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx := abe.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return tx.UnsafeGetAllUsers()
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
return unsafeGetAllUsers(atx.lg, atx.tx)
|
||||
}
|
||||
|
||||
func unsafeGetAllUsers(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.User {
|
||||
var vs [][]byte
|
||||
err := atx.tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
|
||||
err := tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error {
|
||||
vs = append(vs, v)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to get users",
|
||||
lg.Panic("failed to get users",
|
||||
zap.Error(err))
|
||||
}
|
||||
if len(vs) == 0 {
|
||||
@ -93,7 +101,7 @@ func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
user := &authpb.User{}
|
||||
err := user.Unmarshal(vs[i])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
|
||||
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
|
||||
}
|
||||
users[i] = user
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ func CreateMetaBucket(tx backend.BatchTx) {
|
||||
// UnsafeReadConsistentIndex loads consistent index & term from given transaction.
|
||||
// returns 0,0 if the data are not found.
|
||||
// Term is persisted since v3.5.
|
||||
func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
func UnsafeReadConsistentIndex(tx backend.UnsafeReader) (uint64, uint64) {
|
||||
_, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return 0, 0
|
||||
|
@ -37,7 +37,7 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
|
||||
|
||||
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
|
||||
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
|
||||
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
|
||||
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.UnsafeReader) *raftpb.ConfState {
|
||||
keys, vals := tx.UnsafeRange(Meta, MetaConfStateName, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
|
@ -26,7 +26,7 @@ func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
|
||||
tx.UnsafeCreateBucket(Lease)
|
||||
}
|
||||
|
||||
func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
|
||||
func MustUnsafeGetAllLeases(tx backend.UnsafeReader) []*leasepb.Lease {
|
||||
ls := make([]*leasepb.Lease, 0)
|
||||
err := tx.UnsafeForEach(Lease, func(k, v []byte) error {
|
||||
var lpb leasepb.Lease
|
||||
|
@ -32,7 +32,7 @@ func Validate(lg *zap.Logger, tx backend.ReadTx) error {
|
||||
return unsafeValidate(lg, tx)
|
||||
}
|
||||
|
||||
func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error {
|
||||
func unsafeValidate(lg *zap.Logger, tx backend.UnsafeReader) error {
|
||||
current, err := UnsafeDetectSchemaVersion(lg, tx)
|
||||
if err != nil {
|
||||
// v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version.
|
||||
@ -91,7 +91,7 @@ func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, e
|
||||
}
|
||||
|
||||
// UnsafeDetectSchemaVersion non-threadsafe version of DetectSchemaVersion.
|
||||
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
|
||||
func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.UnsafeReader) (v semver.Version, err error) {
|
||||
vp := UnsafeReadStorageVersion(tx)
|
||||
if vp != nil {
|
||||
return *vp, nil
|
||||
|
@ -31,7 +31,7 @@ func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||
|
||||
// UnsafeReadStorageVersion loads storage version from given backend transaction.
|
||||
// Populated since v3.6
|
||||
func UnsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
|
||||
func UnsafeReadStorageVersion(tx backend.UnsafeReader) *semver.Version {
|
||||
_, vs := tx.UnsafeRange(Meta, MetaStorageVersionName, nil, 1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user