From 10b65c97dd749adca18485a03cf7bfee3b18790f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 20 Aug 2017 22:06:03 -0700 Subject: [PATCH 1/2] mvcc: benchmark Range() on a single key --- mvcc/kvstore_bench_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index b0db47f11..92f3d6d50 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -45,6 +45,25 @@ func BenchmarkStorePut(b *testing.B) { } } +func BenchmarkStoreRangeOneKey(b *testing.B) { + var i fakeConsistentIndex + be, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(be, &lease.FakeLessor{}, &i) + defer cleanup(s, be, tmpPath) + + // 64 byte key/val + key, val := createBytesSlice(64, 1), createBytesSlice(64, 1) + s.Put(key[0], val[0], lease.NoLease) + // Force into boltdb tx instead of backend read tx. + s.Commit() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Range(key[0], nil, RangeOptions{}) + } +} + func BenchmarkConsistentIndex(b *testing.B) { fci := fakeConsistentIndex(10) be, tmpPath := backend.NewDefaultTmpBackend() From 8b872196d0a09a67aa3168d7afc97eb81ad4c739 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 21 Aug 2017 00:08:17 -0700 Subject: [PATCH 2/2] backend: cache buckets in read tx Saves an alloc and about 10% of Range() time. --- mvcc/backend/backend.go | 9 +++++--- mvcc/backend/batch_tx.go | 49 ++++++++++++---------------------------- mvcc/backend/read_tx.go | 37 ++++++++++++++++++++++++++---- 3 files changed, 53 insertions(+), 42 deletions(-) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 87edd25f4..42009dd47 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -139,8 +139,11 @@ func newBackend(bcfg BackendConfig) *backend { batchInterval: bcfg.BatchInterval, batchLimit: bcfg.BatchLimit, - readTx: &readTx{buf: txReadBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}}, + readTx: &readTx{ + buf: txReadBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + }, + buckets: make(map[string]*bolt.Bucket), }, stopc: make(chan struct{}), @@ -339,7 +342,7 @@ func (b *backend) defrag() error { plog.Fatalf("cannot begin tx (%s)", err) } - b.readTx.buf.reset() + b.readTx.reset() b.readTx.tx = b.unsafeBegin(false) atomic.StoreInt64(&b.size, b.readTx.tx.Size()) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index fed9d69c9..43ddc7188 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -16,7 +16,6 @@ package backend import ( "bytes" - "fmt" "math" "sync" "sync/atomic" @@ -45,13 +44,6 @@ type batchTx struct { pending int } -var nopLock sync.Locker = &nopLocker{} - -type nopLocker struct{} - -func (*nopLocker) Lock() {} -func (*nopLocker) Unlock() {} - func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { @@ -88,42 +80,32 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo // UnsafeRange must be called holding the lock on the tx. func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { - // nop lock since a write txn should already hold a lock over t.tx - k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit, nopLock) - if err != nil { - plog.Fatal(err) + bucket := t.tx.Bucket(bucketName) + if bucket == nil { + plog.Fatalf("bucket %s does not exist", bucketName) } - return k, v + return unsafeRange(bucket.Cursor(), key, endKey, limit) } -func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64, l sync.Locker) (keys [][]byte, vs [][]byte, err error) { - l.Lock() - bucket := tx.Bucket(bucketName) - if bucket == nil { - l.Unlock() - return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName) - } - if len(endKey) == 0 { - v := bucket.Get(key) - l.Unlock() - if v != nil { - return append(keys, key), append(vs, v), nil - } - return nil, nil, nil - } - c := bucket.Cursor() - l.Unlock() +func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { if limit <= 0 { limit = math.MaxInt64 } - for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { + var isMatch func(b []byte) bool + if len(endKey) > 0 { + isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 } + } else { + isMatch = func(b []byte) bool { return bytes.Equal(b, key) } + limit = 1 + } + for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) if limit == int64(len(keys)) { break } } - return keys, vs, nil + return keys, vs } // UnsafeDelete must be called holding the lock on the tx. @@ -257,8 +239,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { if err := t.backend.readTx.tx.Rollback(); err != nil { plog.Fatalf("cannot rollback tx (%s)", err) } - t.backend.readTx.buf.reset() - t.backend.readTx.tx = nil + t.backend.readTx.reset() } t.batchTx.commit(stop) diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index 9101cfd2a..0536de70e 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -40,9 +40,10 @@ type readTx struct { mu sync.RWMutex buf txReadBuffer - // txmu protects accesses to the Tx on Range requests - txmu sync.Mutex - tx *bolt.Tx + // txmu protects accesses to buckets and tx on Range requests. + txmu sync.RWMutex + tx *bolt.Tx + buckets map[string]*bolt.Bucket } func (rt *readTx) Lock() { rt.mu.RLock() } @@ -63,8 +64,28 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] if int64(len(keys)) == limit { return keys, vals } - // ignore error since bucket may have been created in this batch - k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)), &rt.txmu) + + // find/cache bucket + bn := string(bucketName) + rt.txmu.RLock() + bucket, ok := rt.buckets[bn] + rt.txmu.RUnlock() + if !ok { + rt.txmu.Lock() + bucket = rt.tx.Bucket(bucketName) + rt.buckets[bn] = bucket + rt.txmu.Unlock() + } + + // ignore missing bucket since may have been created in this batch + if bucket == nil { + return keys, vals + } + rt.txmu.Lock() + c := bucket.Cursor() + rt.txmu.Unlock() + + k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) } @@ -91,3 +112,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err } return rt.buf.ForEach(bucketName, visitor) } + +func (rt *readTx) reset() { + rt.buf.reset() + rt.buckets = make(map[string]*bolt.Bucket) + rt.tx = nil +}