Represent bucket as object instead of []byte name.

Thanks to this change:
  - all the maps bucket -> buffer are indexed by int's instead of
string. No need to do: byte[] -> string -> hash conversion on each
access.
  - buckets are strongly typed in backend/mvcc API.
This commit is contained in:
Piotr Tabor
2021-05-17 23:05:27 +02:00
parent 8bddbdc1d6
commit e6baf6d751
25 changed files with 359 additions and 289 deletions

View File

@@ -53,7 +53,7 @@ type Backend interface {
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
@@ -194,10 +194,10 @@ func newBackend(bcfg BackendConfig) *backend {
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[string]*bolt.Bucket),
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
@@ -358,12 +358,7 @@ func (b *backend) Snapshot() Snapshot {
return &snapshot{tx, stopc, donec}
}
type IgnoreKey struct {
Bucket string
Key string
}
func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
b.mu.RLock()
@@ -377,8 +372,7 @@ func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
}
h.Write(next)
b.ForEach(func(k, v []byte) error {
bk := IgnoreKey{Bucket: string(next), Key: string(k)}
if _, ok := ignores[bk]; !ok {
if ignores != nil && !ignores(next, k) {
h.Write(k)
h.Write(v)
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func BenchmarkBackendPut(b *testing.B) {
@@ -41,13 +42,13 @@ func BenchmarkBackendPut(b *testing.B) {
batchTx := backend.BatchTx()
batchTx.Lock()
batchTx.UnsafeCreateBucket([]byte("test"))
batchTx.UnsafeCreateBucket(buckets.Test)
batchTx.Unlock()
b.ResetTimer()
for i := 0; i < b.N; i++ {
batchTx.Lock()
batchTx.UnsafePut([]byte("test"), keys[i], value)
batchTx.UnsafePut(buckets.Test, keys[i], value)
batchTx.Unlock()
}
}

View File

@@ -25,6 +25,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func TestBackendClose(t *testing.T) {
@@ -52,8 +53,8 @@ func TestBackendSnapshot(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
@@ -77,7 +78,7 @@ func TestBackendSnapshot(t *testing.T) {
newTx := nb.BatchTx()
newTx.Lock()
ks, _ := newTx.UnsafeRange([]byte("test"), []byte("foo"), []byte("goo"), 0)
ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0)
if len(ks) != 1 {
t.Errorf("len(kvs) = %d, want 1", len(ks))
}
@@ -94,8 +95,8 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
for i := 0; i < 10; i++ {
@@ -126,9 +127,9 @@ func TestBackendDefrag(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
for i := 0; i < backend.DefragLimitForTest()+100; i++ {
tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
}
tx.Unlock()
b.ForceCommit()
@@ -137,7 +138,7 @@ func TestBackendDefrag(t *testing.T) {
tx = b.BatchTx()
tx.Lock()
for i := 0; i < 50; i++ {
tx.UnsafeDelete([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)))
tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)))
}
tx.Unlock()
b.ForceCommit()
@@ -171,8 +172,8 @@ func TestBackendDefrag(t *testing.T) {
// try put more keys after shrink.
tx = b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
}
@@ -184,15 +185,15 @@ func TestBackendWriteback(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar"))
tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz"))
tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar"))
tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz"))
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
tx.Unlock()
// overwrites should be propagated too
tx.Lock()
tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
tx.Unlock()
keys := []struct {
@@ -242,12 +243,14 @@ func TestBackendWriteback(t *testing.T) {
}
rtx := b.ReadTx()
for i, tt := range keys {
rtx.RLock()
k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit)
rtx.RUnlock()
if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
}
func() {
rtx.RLock()
defer rtx.RUnlock()
k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit)
if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
}
}()
}
}
@@ -258,20 +261,20 @@ func TestConcurrentReadTx(t *testing.T) {
wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket([]byte("key"))
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
wtx1.UnsafeCreateBucket(buckets.Key)
wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC"))
wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
wtx1.Unlock()
wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF"))
wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
wtx2.Unlock()
rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
@@ -288,10 +291,10 @@ func TestBackendWritebackForEach(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafeCreateBucket(buckets.Key)
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
}
tx.Unlock()
@@ -299,10 +302,10 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafeCreateBucket(buckets.Key)
for i := 5; i < 20; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
}
tx.Unlock()
@@ -313,7 +316,7 @@ func TestBackendWritebackForEach(t *testing.T) {
}
rtx := b.ReadTx()
rtx.RLock()
assert.NoError(t, rtx.UnsafeForEach([]byte("key"), getSeq))
assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq))
rtx.RUnlock()
partialSeq := seq
@@ -322,7 +325,7 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
assert.NoError(t, tx.UnsafeForEach([]byte("key"), getSeq))
assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq))
tx.Unlock()
if seq != partialSeq {

View File

@@ -25,13 +25,30 @@ import (
"go.uber.org/zap"
)
type BucketID int
type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
// in the in-memory maps.
ID() BucketID
Name() []byte
// String implements Stringer (human readable name).
String() string
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
IsSafeRangeBucket() bool
}
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafeDeleteBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
@@ -69,24 +86,24 @@ func (t *batchTx) RUnlock() {
panic("unexpected RUnlock")
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
_, err := t.tx.CreateBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafeDeleteBucket(name []byte) {
err := t.tx.DeleteBucket(name)
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
err := t.tx.DeleteBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
zap.String("bucket-name", string(name)),
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
@@ -94,21 +111,21 @@ func (t *batchTx) UnsafeDeleteBucket(name []byte) {
}
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, false)
}
// UnsafeSeqPut must be called holding the lock on the tx.
func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, true)
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, true)
}
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@@ -120,7 +137,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
if err := bucket.Put(key, value); err != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
@@ -128,12 +145,12 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
}
// UnsafeRange must be called holding the lock on the tx.
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@@ -163,12 +180,12 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
}
// UnsafeDelete must be called holding the lock on the tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@@ -176,7 +193,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
if err != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
@@ -184,12 +201,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
}
// UnsafeForEach must be called holding the lock on the tx.
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucketName, visitor)
func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucket, visitor)
}
func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket); b != nil {
func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket.Name()); b != nil {
return b.ForEach(visitor)
}
return nil
@@ -253,8 +270,8 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
bucket2seq: make(map[string]bool),
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bucket2seq: make(map[BucketID]bool),
},
}
tx.Commit()
@@ -316,12 +333,12 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
}
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafePut(bucket, key, value)
t.buf.put(bucket, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value)
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

View File

@@ -22,6 +22,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func TestBatchTxPut(t *testing.T) {
@@ -33,18 +34,18 @@ func TestBatchTxPut(t *testing.T) {
tx.Lock()
// create bucket
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
// put
v := []byte("bar")
tx.UnsafePut([]byte("test"), []byte("foo"), v)
tx.UnsafePut(buckets.Test, []byte("foo"), v)
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
_, gv := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
_, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
tx.Unlock()
if !reflect.DeepEqual(gv[0], v) {
t.Errorf("v = %s, want %s", string(gv[0]), string(v))
@@ -61,12 +62,12 @@ func TestBatchTxRange(t *testing.T) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
// put keys
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")}
for i := range allKeys {
tx.UnsafePut([]byte("test"), allKeys[i], allVals[i])
tx.UnsafePut(buckets.Test, allKeys[i], allVals[i])
}
tests := []struct {
@@ -114,7 +115,7 @@ func TestBatchTxRange(t *testing.T) {
},
}
for i, tt := range tests {
keys, vals := tx.UnsafeRange([]byte("test"), tt.key, tt.endKey, tt.limit)
keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit)
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys)
}
@@ -131,17 +132,17 @@ func TestBatchTxDelete(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeDelete([]byte("test"), []byte("foo"))
tx.UnsafeDelete(buckets.Test, []byte("foo"))
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
ks, _ := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
tx.Unlock()
if len(ks) != 0 {
t.Errorf("keys on foo = %v, want nil", ks)
@@ -156,15 +157,15 @@ func TestBatchTxCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("test"))
bucket := tx.Bucket(buckets.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil
@@ -185,14 +186,14 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
// batch limit commit should have been triggered
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("test"))
bucket := tx.Bucket(buckets.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil

View File

@@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
var (
bucket = []byte("bucket")
bucket = buckets.Test
key = []byte("key")
)

View File

@@ -15,17 +15,15 @@
package backend
import (
"bytes"
"math"
"sync"
bolt "go.etcd.io/bbolt"
)
// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but IsSafeRangeBucket
// is known to never overwrite any key so range is safe.
var safeRangeBucket = []byte("key")
type ReadTx interface {
Lock()
@@ -33,8 +31,8 @@ type ReadTx interface {
RLock()
RUnlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
@@ -47,12 +45,12 @@ type baseReadTx struct {
// txMu protects accesses to buckets and tx on Range requests.
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
buckets map[BucketID]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}
func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
@@ -64,19 +62,19 @@ func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v
}
return visitor(k, v)
}
if err := baseReadTx.buf.ForEach(bucketName, getDups); err != nil {
if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
return err
}
baseReadTx.txMu.Lock()
err := unsafeForEach(baseReadTx.tx, bucketName, visitNoDup)
err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
baseReadTx.txMu.Unlock()
if err != nil {
return err
}
return baseReadTx.buf.ForEach(bucketName, visitor)
return baseReadTx.buf.ForEach(bucket, visitor)
}
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
@@ -84,16 +82,16 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
if limit > 1 && !bucketType.IsSafeRangeBucket() {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
// find/cache bucket
bn := string(bucketName)
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
@@ -101,7 +99,7 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketName)
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
@@ -133,7 +131,7 @@ func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.buckets = make(map[BucketID]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

View File

@@ -23,7 +23,7 @@ const bucketBufferInitialSize = 512
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
buckets map[BucketID]*bucketBuffer
}
func (txb *txBuffer) reset() {
@@ -39,28 +39,26 @@ func (txb *txBuffer) reset() {
// txWriteBuffer buffers writes of pending updates that have not yet committed.
type txWriteBuffer struct {
txBuffer
// Map from bucket name into information whether this bucket is edited
// Map from bucket ID into information whether this bucket is edited
// sequentially (i.e. keys are growing monotonically).
bucket2seq map[string]bool
bucket2seq map[BucketID]bool
}
// TODO: Passing bucket as an (int) enum would avoid a lot of byte[]->string->hash conversions.
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
bucketstr := string(bucket)
txw.bucket2seq[bucketstr] = false
txw.putInternal(bucketstr, k, v)
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
txw.bucket2seq[bucket.ID()] = false
txw.putInternal(bucket, k, v)
}
func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
// TODO: Add (in tests?) verification whether k>b[len(b)]
txw.putInternal(string(bucket), k, v)
txw.putInternal(bucket, k, v)
}
func (txw *txWriteBuffer) putInternal(bucket string, k, v []byte) {
b, ok := txw.buckets[bucket]
func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
b, ok := txw.buckets[bucket.ID()]
if !ok {
b = newBucketBuffer()
txw.buckets[bucket] = b
txw.buckets[bucket.ID()] = b
}
b.add(k, v)
}
@@ -103,15 +101,15 @@ type txReadBuffer struct {
bufVersion uint64
}
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[bucket.ID()]; b != nil {
return b.Range(key, endKey, limit)
}
return nil, nil
}
func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
if b := txr.buckets[string(bucketName)]; b != nil {
func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {
if b := txr.buckets[bucket.ID()]; b != nil {
return b.ForEach(visitor)
}
return nil
@@ -121,7 +119,7 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txrCopy := txReadBuffer{
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}