diff --git a/server/storage/backend/batch_tx_test.go b/server/storage/backend/batch_tx_test.go index cc099d1f9..d0f6dcdc0 100644 --- a/server/storage/backend/batch_tx_test.go +++ b/server/storage/backend/batch_tx_test.go @@ -252,6 +252,53 @@ func TestRangeAfterDeleteMatch(t *testing.T) { checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) } +func TestRangeAfterOverwriteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), []byte("foo3"), 1) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) +} + +func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) + + tx.Lock() + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3")) + tx.UnsafeDelete(schema.Test, []byte("foo1")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")}) +} + func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) { tx.Lock() ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit) diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 7c2f9d63a..590cf2af6 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -83,6 +83,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { if !ok { delete(txw.buckets, k) txr.buckets[k] = wb + wb.dedupe() continue } if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { @@ -203,10 +204,12 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { return } + bb.dedupe() +} +// dedupe removes duplicates, using only newest update +func (bb *bucketBuffer) dedupe() { sort.Stable(bb) - - // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {