mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17228 from siyuanfoundation/txBuf1
Fix tx buffer inconsistency if there are duplicate keys in one tx.
This commit is contained in:
commit
ed994248e0
@ -252,6 +252,53 @@ func TestRangeAfterDeleteMatch(t *testing.T) {
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
|
||||
}
|
||||
|
||||
func TestRangeAfterOverwriteMatch(t *testing.T) {
|
||||
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(schema.Test)
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
|
||||
tx.Unlock()
|
||||
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), []byte("foo3"), 1)
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
|
||||
}
|
||||
|
||||
func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) {
|
||||
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(schema.Test)
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
|
||||
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
|
||||
tx.Unlock()
|
||||
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3"))
|
||||
tx.UnsafeDelete(schema.Test, []byte("foo1"))
|
||||
tx.Unlock()
|
||||
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0)
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")})
|
||||
}
|
||||
|
||||
func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
|
||||
tx.Lock()
|
||||
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
|
||||
|
@ -83,6 +83,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
||||
if !ok {
|
||||
delete(txw.buckets, k)
|
||||
txr.buckets[k] = wb
|
||||
wb.dedupe()
|
||||
continue
|
||||
}
|
||||
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
|
||||
@ -203,10 +204,12 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
|
||||
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
|
||||
return
|
||||
}
|
||||
bb.dedupe()
|
||||
}
|
||||
|
||||
// dedupe removes duplicates, using only newest update
|
||||
func (bb *bucketBuffer) dedupe() {
|
||||
sort.Stable(bb)
|
||||
|
||||
// remove duplicates, using only newest update
|
||||
widx := 0
|
||||
for ridx := 1; ridx < bb.used; ridx++ {
|
||||
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user