mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc/backend: rename ReadTx Lock() to RLock()
For better code readability, renaming Lock() to RLock() in ReadTx interface.
This commit is contained in:
parent
918f0414dd
commit
1c19f126cb
@ -336,8 +336,8 @@ func (b *backend) defrag() error {
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// block concurrent read requests while resetting tx
|
||||
b.readTx.mu.Lock()
|
||||
defer b.readTx.mu.Unlock()
|
||||
b.readTx.Lock()
|
||||
defer b.readTx.Unlock()
|
||||
|
||||
b.batchTx.unsafeCommit(true)
|
||||
|
||||
|
@ -45,6 +45,29 @@ type batchTx struct {
|
||||
pending int
|
||||
}
|
||||
|
||||
func (t *batchTx) Lock() {
|
||||
t.Mutex.Lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
}
|
||||
t.Mutex.Unlock()
|
||||
}
|
||||
|
||||
// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
|
||||
// have appropriate semantics in BatchTx interface. Therefore should not be called.
|
||||
// TODO: might want to decouple ReadTx and BatchTx
|
||||
|
||||
func (t *batchTx) RLock() {
|
||||
panic("unexpected RLock")
|
||||
}
|
||||
|
||||
func (t *batchTx) RUnlock() {
|
||||
panic("unexpected RUnlock")
|
||||
}
|
||||
|
||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||
_, err := t.tx.CreateBucket(name)
|
||||
if err != nil && err != bolt.ErrBucketExists {
|
||||
@ -194,13 +217,6 @@ func (t *batchTx) CommitAndStop() {
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
}
|
||||
t.Mutex.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTx) safePending() int {
|
||||
t.Mutex.Lock()
|
||||
defer t.Mutex.Unlock()
|
||||
@ -259,9 +275,9 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
||||
|
||||
func (t *batchTxBuffered) Unlock() {
|
||||
if t.pending != 0 {
|
||||
t.backend.readTx.mu.Lock()
|
||||
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
||||
t.buf.writeback(&t.backend.readTx.buf)
|
||||
t.backend.readTx.mu.Unlock()
|
||||
t.backend.readTx.Unlock()
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
}
|
||||
@ -283,9 +299,9 @@ func (t *batchTxBuffered) CommitAndStop() {
|
||||
|
||||
func (t *batchTxBuffered) commit(stop bool) {
|
||||
// all read txs must be closed to acquire boltdb commit rwlock
|
||||
t.backend.readTx.mu.Lock()
|
||||
t.backend.readTx.Lock()
|
||||
t.unsafeCommit(stop)
|
||||
t.backend.readTx.mu.Unlock()
|
||||
t.backend.readTx.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
|
@ -30,6 +30,8 @@ var safeRangeBucket = []byte("key")
|
||||
type ReadTx interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
RLock()
|
||||
RUnlock()
|
||||
|
||||
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
||||
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
|
||||
@ -46,8 +48,10 @@ type readTx struct {
|
||||
buckets map[string]*bolt.Bucket
|
||||
}
|
||||
|
||||
func (rt *readTx) Lock() { rt.mu.RLock() }
|
||||
func (rt *readTx) Unlock() { rt.mu.RUnlock() }
|
||||
func (rt *readTx) Lock() { rt.mu.Lock() }
|
||||
func (rt *readTx) Unlock() { rt.mu.Unlock() }
|
||||
func (rt *readTx) RLock() { rt.mu.RLock() }
|
||||
func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
|
||||
|
||||
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
if endKey == nil {
|
||||
|
@ -196,8 +196,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
|
||||
keep := s.kvindex.Keep(rev)
|
||||
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
s.mu.RUnlock()
|
||||
|
||||
upper := revision{main: rev + 1}
|
||||
|
@ -725,6 +725,8 @@ type fakeBatchTx struct {
|
||||
|
||||
func (b *fakeBatchTx) Lock() {}
|
||||
func (b *fakeBatchTx) Unlock() {}
|
||||
func (b *fakeBatchTx) RLock() {}
|
||||
func (b *fakeBatchTx) RUnlock() {}
|
||||
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
|
||||
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
|
||||
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
|
||||
|
@ -33,7 +33,11 @@ func (s *store) Read() TxnRead {
|
||||
s.mu.RLock()
|
||||
tx := s.b.ReadTx()
|
||||
s.revMu.RLock()
|
||||
tx.Lock()
|
||||
// tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations:
|
||||
// A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite).
|
||||
// B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb
|
||||
// and txReadBuffer needs to be reset.
|
||||
tx.RLock()
|
||||
firstRev, rev := s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
||||
@ -47,7 +51,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) End() {
|
||||
tr.tx.Unlock()
|
||||
tr.tx.RUnlock()
|
||||
tr.s.mu.RUnlock()
|
||||
}
|
||||
|
||||
|
@ -346,7 +346,7 @@ func (s *watchableStore) syncWatchers() int {
|
||||
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
||||
// values are actual key-value pairs in backend.
|
||||
tx := s.store.b.ReadTx()
|
||||
tx.Lock()
|
||||
tx.RLock()
|
||||
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
||||
var evs []mvccpb.Event
|
||||
if s.store != nil && s.store.lg != nil {
|
||||
@ -355,7 +355,7 @@ func (s *watchableStore) syncWatchers() int {
|
||||
// TODO: remove this in v3.5
|
||||
evs = kvsToEvents(nil, wg, revs, vs)
|
||||
}
|
||||
tx.Unlock()
|
||||
tx.RUnlock()
|
||||
|
||||
var victims watcherBatch
|
||||
wb := newWatcherBatch(wg, evs)
|
||||
|
Loading…
x
Reference in New Issue
Block a user