Merge 4142d49eeac7addd6335fc64877185b317e4e0a2 into 594427d28cf1547a02fb08db6e65ce72b02a8af6

This commit is contained in:
Siyuan Zhang 2024-09-17 13:57:12 +01:00 committed by GitHub
commit 14de1b199c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 104 additions and 7 deletions

View File

@ -239,24 +239,115 @@ func TestRangeAfterDeleteMatch(t *testing.T) {
tx.Unlock()
tx.Commit()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
tx.Lock()
tx.UnsafeDelete(buckets.Test, []byte("foo"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}
func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
func TestRangeAfterUnorderedKeyWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)
tx := b.BatchTx()
tx.Lock()
ks1, vs1 := tx.UnsafeRange(buckets.Test, key, endKey, limit)
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo5"), []byte("bar5"))
tx.UnsafePut(buckets.Test, []byte("foo2"), []byte("bar2"))
tx.UnsafePut(buckets.Test, []byte("foo1"), []byte("bar1"))
tx.UnsafePut(buckets.Test, []byte("foo3"), []byte("bar3"))
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafePut(buckets.Test, []byte("foo4"), []byte("bar4"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), nil, 1)
}
func TestRangeAfterAlternatingBucketWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafeSeqPut(buckets.Key, []byte("key1"), []byte("val1"))
tx.Unlock()
tx.Lock()
tx.UnsafeSeqPut(buckets.Key, []byte("key2"), []byte("val2"))
tx.Unlock()
tx.Commit()
tx.Commit()
tx.Lock()
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), []byte("foo3"), 1)
}
func TestRangeAfterOverwriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(buckets.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar1"))
tx.UnsafePut(buckets.Test, []byte("foo1"), []byte("bar11"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), []byte("foo3"), 1)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
}
func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(buckets.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar1"))
tx.UnsafePut(buckets.Test, []byte("foo1"), []byte("bar11"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
tx.Lock()
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar3"))
tx.UnsafeDelete(buckets.Test, []byte("foo1"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), buckets.Test, []byte("foo1"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")})
}
func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, bucket backend.Bucket, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange(bucket, key, endKey, limit)
tx.Unlock()
rtx.RLock()
ks2, vs2 := rtx.UnsafeRange(buckets.Test, key, endKey, limit)
ks2, vs2 := rtx.UnsafeRange(bucket, key, endKey, limit)
rtx.RUnlock()
if diff := cmp.Diff(ks1, ks2); diff != "" {

View File

@ -80,6 +80,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
wb.dedupe()
txr.buckets[k] = wb
continue
}
@ -201,10 +202,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
return
}
bb.dedupe()
}
// dedupe removes duplicates, using only newest update
func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}
sort.Stable(bb)
// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {