mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8425 from heyitsanthony/bench-get
mvcc: benchmark Range() on a single key
This commit is contained in:
commit
953c199b74
@ -139,8 +139,11 @@ func newBackend(bcfg BackendConfig) *backend {
|
||||
batchInterval: bcfg.BatchInterval,
|
||||
batchLimit: bcfg.BatchLimit,
|
||||
|
||||
readTx: &readTx{buf: txReadBuffer{
|
||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
|
||||
readTx: &readTx{
|
||||
buf: txReadBuffer{
|
||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
||||
},
|
||||
buckets: make(map[string]*bolt.Bucket),
|
||||
},
|
||||
|
||||
stopc: make(chan struct{}),
|
||||
@ -339,7 +342,7 @@ func (b *backend) defrag() error {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
b.readTx.buf.reset()
|
||||
b.readTx.reset()
|
||||
b.readTx.tx = b.unsafeBegin(false)
|
||||
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
||||
|
||||
|
@ -16,7 +16,6 @@ package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -45,13 +44,6 @@ type batchTx struct {
|
||||
pending int
|
||||
}
|
||||
|
||||
var nopLock sync.Locker = &nopLocker{}
|
||||
|
||||
type nopLocker struct{}
|
||||
|
||||
func (*nopLocker) Lock() {}
|
||||
func (*nopLocker) Unlock() {}
|
||||
|
||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||
_, err := t.tx.CreateBucket(name)
|
||||
if err != nil && err != bolt.ErrBucketExists {
|
||||
@ -88,42 +80,32 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
||||
|
||||
// UnsafeRange must be called holding the lock on the tx.
|
||||
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
// nop lock since a write txn should already hold a lock over t.tx
|
||||
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit, nopLock)
|
||||
if err != nil {
|
||||
plog.Fatal(err)
|
||||
bucket := t.tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
return k, v
|
||||
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
||||
}
|
||||
|
||||
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64, l sync.Locker) (keys [][]byte, vs [][]byte, err error) {
|
||||
l.Lock()
|
||||
bucket := tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
l.Unlock()
|
||||
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
if len(endKey) == 0 {
|
||||
v := bucket.Get(key)
|
||||
l.Unlock()
|
||||
if v != nil {
|
||||
return append(keys, key), append(vs, v), nil
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
c := bucket.Cursor()
|
||||
l.Unlock()
|
||||
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
|
||||
if limit <= 0 {
|
||||
limit = math.MaxInt64
|
||||
}
|
||||
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
|
||||
var isMatch func(b []byte) bool
|
||||
if len(endKey) > 0 {
|
||||
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
|
||||
} else {
|
||||
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
|
||||
limit = 1
|
||||
}
|
||||
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
|
||||
vs = append(vs, cv)
|
||||
keys = append(keys, ck)
|
||||
if limit == int64(len(keys)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return keys, vs, nil
|
||||
return keys, vs
|
||||
}
|
||||
|
||||
// UnsafeDelete must be called holding the lock on the tx.
|
||||
@ -257,8 +239,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||
}
|
||||
t.backend.readTx.buf.reset()
|
||||
t.backend.readTx.tx = nil
|
||||
t.backend.readTx.reset()
|
||||
}
|
||||
|
||||
t.batchTx.commit(stop)
|
||||
|
@ -40,9 +40,10 @@ type readTx struct {
|
||||
mu sync.RWMutex
|
||||
buf txReadBuffer
|
||||
|
||||
// txmu protects accesses to the Tx on Range requests
|
||||
txmu sync.Mutex
|
||||
tx *bolt.Tx
|
||||
// txmu protects accesses to buckets and tx on Range requests.
|
||||
txmu sync.RWMutex
|
||||
tx *bolt.Tx
|
||||
buckets map[string]*bolt.Bucket
|
||||
}
|
||||
|
||||
func (rt *readTx) Lock() { rt.mu.RLock() }
|
||||
@ -63,8 +64,28 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
|
||||
if int64(len(keys)) == limit {
|
||||
return keys, vals
|
||||
}
|
||||
// ignore error since bucket may have been created in this batch
|
||||
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)), &rt.txmu)
|
||||
|
||||
// find/cache bucket
|
||||
bn := string(bucketName)
|
||||
rt.txmu.RLock()
|
||||
bucket, ok := rt.buckets[bn]
|
||||
rt.txmu.RUnlock()
|
||||
if !ok {
|
||||
rt.txmu.Lock()
|
||||
bucket = rt.tx.Bucket(bucketName)
|
||||
rt.buckets[bn] = bucket
|
||||
rt.txmu.Unlock()
|
||||
}
|
||||
|
||||
// ignore missing bucket since may have been created in this batch
|
||||
if bucket == nil {
|
||||
return keys, vals
|
||||
}
|
||||
rt.txmu.Lock()
|
||||
c := bucket.Cursor()
|
||||
rt.txmu.Unlock()
|
||||
|
||||
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
|
||||
return append(k2, keys...), append(v2, vals...)
|
||||
}
|
||||
|
||||
@ -91,3 +112,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
|
||||
}
|
||||
return rt.buf.ForEach(bucketName, visitor)
|
||||
}
|
||||
|
||||
func (rt *readTx) reset() {
|
||||
rt.buf.reset()
|
||||
rt.buckets = make(map[string]*bolt.Bucket)
|
||||
rt.tx = nil
|
||||
}
|
||||
|
@ -45,6 +45,25 @@ func BenchmarkStorePut(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkStoreRangeOneKey(b *testing.B) {
|
||||
var i fakeConsistentIndex
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(be, &lease.FakeLessor{}, &i)
|
||||
defer cleanup(s, be, tmpPath)
|
||||
|
||||
// 64 byte key/val
|
||||
key, val := createBytesSlice(64, 1), createBytesSlice(64, 1)
|
||||
s.Put(key[0], val[0], lease.NoLease)
|
||||
// Force into boltdb tx instead of backend read tx.
|
||||
s.Commit()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Range(key[0], nil, RangeOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkConsistentIndex(b *testing.B) {
|
||||
fci := fakeConsistentIndex(10)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
Loading…
x
Reference in New Issue
Block a user