diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index f945dd589..bf00dad79 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -170,11 +170,14 @@ func newBackend(bcfg BackendConfig) *backend { batchLimit: bcfg.BatchLimit, readTx: &readTx{ - buf: txReadBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + baseReadTx: baseReadTx{ + buf: txReadBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + }, + buckets: make(map[string]*bolt.Bucket), + txWg: new(sync.WaitGroup), + txMu: new(sync.RWMutex), }, - buckets: make(map[string]*bolt.Bucket), - txWg: new(sync.WaitGroup), }, stopc: make(chan struct{}), @@ -206,11 +209,13 @@ func (b *backend) ConcurrentReadTx() ReadTx { b.readTx.txWg.Add(1) // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. return &concurrentReadTx{ - buf: b.readTx.buf.unsafeCopy(), - tx: b.readTx.tx, - txMu: &b.readTx.txMu, - buckets: b.readTx.buckets, - txWg: b.readTx.txWg, + baseReadTx: baseReadTx{ + buf: b.readTx.buf.unsafeCopy(), + txMu: b.readTx.txMu, + tx: b.readTx.tx, + buckets: b.readTx.buckets, + txWg: b.readTx.txWg, + }, } } diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index fcca9f089..1ff0ba188 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -37,73 +37,22 @@ type ReadTx interface { UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error } -type readTx struct { +// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these +type baseReadTx struct { // mu protects accesses to the txReadBuffer mu sync.RWMutex buf txReadBuffer // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle. // txMu protects accesses to buckets and tx on Range requests. - txMu sync.RWMutex + txMu *sync.RWMutex tx *bolt.Tx buckets map[string]*bolt.Bucket // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done. txWg *sync.WaitGroup } -func (rt *readTx) Lock() { rt.mu.Lock() } -func (rt *readTx) Unlock() { rt.mu.Unlock() } -func (rt *readTx) RLock() { rt.mu.RLock() } -func (rt *readTx) RUnlock() { rt.mu.RUnlock() } - -func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { - if endKey == nil { - // forbid duplicates for single keys - limit = 1 - } - if limit <= 0 { - limit = math.MaxInt64 - } - if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { - panic("do not use unsafeRange on non-keys bucket") - } - keys, vals := rt.buf.Range(bucketName, key, endKey, limit) - if int64(len(keys)) == limit { - return keys, vals - } - - // find/cache bucket - bn := string(bucketName) - rt.txMu.RLock() - bucket, ok := rt.buckets[bn] - rt.txMu.RUnlock() - lockHeld := false - if !ok { - rt.txMu.Lock() - lockHeld = true - bucket = rt.tx.Bucket(bucketName) - rt.buckets[bn] = bucket - } - - // ignore missing bucket since may have been created in this batch - if bucket == nil { - if lockHeld { - rt.txMu.Unlock() - } - return keys, vals - } - if !lockHeld { - rt.txMu.Lock() - lockHeld = true - } - c := bucket.Cursor() - rt.txMu.Unlock() - - k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) - return append(k2, keys...), append(v2, vals...) -} - -func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { +func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { dups := make(map[string]struct{}) getDups := func(k, v []byte) error { dups[string(k)] = struct{}{} @@ -115,18 +64,74 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err } return visitor(k, v) } - if err := rt.buf.ForEach(bucketName, getDups); err != nil { + if err := baseReadTx.buf.ForEach(bucketName, getDups); err != nil { return err } - rt.txMu.Lock() - err := unsafeForEach(rt.tx, bucketName, visitNoDup) - rt.txMu.Unlock() + baseReadTx.txMu.Lock() + err := unsafeForEach(baseReadTx.tx, bucketName, visitNoDup) + baseReadTx.txMu.Unlock() if err != nil { return err } - return rt.buf.ForEach(bucketName, visitor) + return baseReadTx.buf.ForEach(bucketName, visitor) } +func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if endKey == nil { + // forbid duplicates for single keys + limit = 1 + } + if limit <= 0 { + limit = math.MaxInt64 + } + if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + panic("do not use unsafeRange on non-keys bucket") + } + keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit) + if int64(len(keys)) == limit { + return keys, vals + } + + // find/cache bucket + bn := string(bucketName) + baseReadTx.txMu.RLock() + bucket, ok := baseReadTx.buckets[bn] + baseReadTx.txMu.RUnlock() + lockHeld := false + if !ok { + baseReadTx.txMu.Lock() + lockHeld = true + bucket = baseReadTx.tx.Bucket(bucketName) + baseReadTx.buckets[bn] = bucket + } + + // ignore missing bucket since may have been created in this batch + if bucket == nil { + if lockHeld { + baseReadTx.txMu.Unlock() + } + return keys, vals + } + if !lockHeld { + baseReadTx.txMu.Lock() + lockHeld = true + } + c := bucket.Cursor() + baseReadTx.txMu.Unlock() + + k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) + return append(k2, keys...), append(v2, vals...) +} + +type readTx struct { + baseReadTx +} + +func (rt *readTx) Lock() { rt.mu.Lock() } +func (rt *readTx) Unlock() { rt.mu.Unlock() } +func (rt *readTx) RLock() { rt.mu.RLock() } +func (rt *readTx) RUnlock() { rt.mu.RUnlock() } + func (rt *readTx) reset() { rt.buf.reset() rt.buckets = make(map[string]*bolt.Bucket) @@ -134,13 +139,8 @@ func (rt *readTx) reset() { rt.txWg = new(sync.WaitGroup) } -// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation? type concurrentReadTx struct { - buf txReadBuffer - txMu *sync.RWMutex - tx *bolt.Tx - buckets map[string]*bolt.Bucket - txWg *sync.WaitGroup + baseReadTx } func (rt *concurrentReadTx) Lock() {} @@ -151,67 +151,3 @@ func (rt *concurrentReadTx) RLock() {} // RUnlock signals the end of concurrentReadTx. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() } - -func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - dups := make(map[string]struct{}) - getDups := func(k, v []byte) error { - dups[string(k)] = struct{}{} - return nil - } - visitNoDup := func(k, v []byte) error { - if _, ok := dups[string(k)]; ok { - return nil - } - return visitor(k, v) - } - if err := rt.buf.ForEach(bucketName, getDups); err != nil { - return err - } - rt.txMu.Lock() - err := unsafeForEach(rt.tx, bucketName, visitNoDup) - rt.txMu.Unlock() - if err != nil { - return err - } - return rt.buf.ForEach(bucketName, visitor) -} - -func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { - if endKey == nil { - // forbid duplicates for single keys - limit = 1 - } - if limit <= 0 { - limit = math.MaxInt64 - } - if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { - panic("do not use unsafeRange on non-keys bucket") - } - keys, vals := rt.buf.Range(bucketName, key, endKey, limit) - if int64(len(keys)) == limit { - return keys, vals - } - - // find/cache bucket - bn := string(bucketName) - rt.txMu.RLock() - bucket, ok := rt.buckets[bn] - rt.txMu.RUnlock() - if !ok { - rt.txMu.Lock() - bucket = rt.tx.Bucket(bucketName) - rt.buckets[bn] = bucket - rt.txMu.Unlock() - } - - // ignore missing bucket since may have been created in this batch - if bucket == nil { - return keys, vals - } - rt.txMu.Lock() - c := bucket.Cursor() - rt.txMu.Unlock() - - k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) - return append(k2, keys...), append(v2, vals...) -}