*: keep tombstone if revision == compactAtRev

Before this patch, the tombstone can be deleted if its revision is equal
compacted revision. It causes that the watch subscriber won't get this
DELETE event. Based on Compact API[1], we should keep tombstone revision
if it's not less than the compaction revision.

> CompactionRequest compacts the key-value store up to a given revision.
> All superseded keys with a revision less than the compaction revision
> will be removed.

[1]: https://etcd.io/docs/latest/dev-guide/api_reference_v3/

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2024-07-04 14:35:51 +08:00
parent ee33652775
commit bbdc94181a
4 changed files with 501 additions and 99 deletions

View File

@ -63,6 +63,9 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) {
if !upper.GreaterThan(kr) { if !upper.GreaterThan(kr) {
return return
} }
isTombstone := BytesToBucketKey(k).tombstone
lower := Revision{Main: h.compactRevision + 1} lower := Revision{Main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion // skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one. // due to compacting; don't skip if there isn't one.
@ -71,6 +74,17 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) {
return return
} }
} }
// When performing compaction, if the compacted revision is a
// tombstone, older versions (<= 3.5.15 or <= 3.4.33) will delete
// the tombstone. But newer versions (> 3.5.15 or > 3.4.33) won't
// delete it. So we should skip the tombstone in such cases when
// computing the hash to ensure that both older and newer versions
// can always generate the same hash values.
if kr.Main == h.compactRevision && isTombstone {
return
}
h.hash.Write(k) h.hash.Write(k)
h.hash.Write(v) h.hash.Write(v)
} }

View File

@ -18,7 +18,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/google/btree" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -235,101 +235,445 @@ func TestIndexRevision(t *testing.T) {
func TestIndexCompactAndKeep(t *testing.T) { func TestIndexCompactAndKeep(t *testing.T) {
maxRev := int64(20) maxRev := int64(20)
tests := []struct {
key []byte // key: "foo"
remove bool // modified: 10
rev Revision // generations:
created Revision // {{10, 0}}
ver int64 // {{1, 0}, {5, 0}, {9, 0}(t)}
}{ //
{[]byte("foo"), false, Revision{Main: 1}, Revision{Main: 1}, 1}, // key: "foo1"
{[]byte("foo1"), false, Revision{Main: 2}, Revision{Main: 2}, 1}, // modified: 10, 1
{[]byte("foo2"), false, Revision{Main: 3}, Revision{Main: 3}, 1}, // generations:
{[]byte("foo2"), false, Revision{Main: 4}, Revision{Main: 3}, 2}, // {{10, 1}}
{[]byte("foo"), false, Revision{Main: 5}, Revision{Main: 1}, 2}, // {{2, 0}, {6, 0}, {7, 0}(t)}
{[]byte("foo1"), false, Revision{Main: 6}, Revision{Main: 2}, 2}, //
{[]byte("foo1"), true, Revision{Main: 7}, Revision{}, 0}, // key: "foo2"
{[]byte("foo2"), true, Revision{Main: 8}, Revision{}, 0}, // modified: 8
{[]byte("foo"), true, Revision{Main: 9}, Revision{}, 0}, // generations:
{[]byte("foo"), false, Revision{Main: 10}, Revision{Main: 10}, 1}, // {empty}
{[]byte("foo1"), false, Revision{Main: 10, Sub: 1}, Revision{Main: 10, Sub: 1}, 1}, // {{3, 0}, {4, 0}, {8, 0}(t)}
//
buildTreeIndex := func() index {
ti := newTreeIndex(zaptest.NewLogger(t))
ti.Put([]byte("foo"), Revision{Main: 1})
ti.Put([]byte("foo1"), Revision{Main: 2})
ti.Put([]byte("foo2"), Revision{Main: 3})
ti.Put([]byte("foo2"), Revision{Main: 4})
ti.Put([]byte("foo"), Revision{Main: 5})
ti.Put([]byte("foo1"), Revision{Main: 6})
require.NoError(t, ti.Tombstone([]byte("foo1"), Revision{Main: 7}))
require.NoError(t, ti.Tombstone([]byte("foo2"), Revision{Main: 8}))
require.NoError(t, ti.Tombstone([]byte("foo"), Revision{Main: 9}))
ti.Put([]byte("foo"), Revision{Main: 10})
ti.Put([]byte("foo1"), Revision{Main: 10, Sub: 1})
return ti
} }
afterCompacts := []struct {
atRev int
keyIndexes []keyIndex
keep map[Revision]struct{}
compacted map[Revision]struct{}
}{
{
atRev: 1,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 1}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 1}: {},
},
},
{
atRev: 2,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
},
},
{
atRev: 3,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
Revision{Main: 3}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
Revision{Main: 3}: {},
},
},
{
atRev: 4,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
Revision{Main: 4}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 1}: {},
Revision{Main: 2}: {},
Revision{Main: 4}: {},
},
},
{
atRev: 5,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 2}: {},
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 2}: {},
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
},
{
atRev: 6,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 6}, Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 6}: {},
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 6}: {},
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
},
{
atRev: 7,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 7}}},
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 7}: {},
Revision{Main: 4}: {},
Revision{Main: 5}: {},
},
},
{
atRev: 8,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
{
key: []byte("foo2"),
modified: Revision{Main: 8},
generations: []generation{
{ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 8}}},
{},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 5}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 8}: {},
Revision{Main: 5}: {},
},
},
{
atRev: 9,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 9}}},
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
},
keep: map[Revision]struct{}{},
compacted: map[Revision]struct{}{
Revision{Main: 9}: {},
},
},
{
atRev: 10,
keyIndexes: []keyIndex{
{
key: []byte("foo"),
modified: Revision{Main: 10},
generations: []generation{
{ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}},
},
},
{
key: []byte("foo1"),
modified: Revision{Main: 10, Sub: 1},
generations: []generation{
{ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}},
},
},
},
keep: map[Revision]struct{}{
Revision{Main: 10}: {},
Revision{Main: 10, Sub: 1}: {},
},
compacted: map[Revision]struct{}{
Revision{Main: 10}: {},
Revision{Main: 10, Sub: 1}: {},
},
},
}
ti := buildTreeIndex()
// Continuous Compact and Keep // Continuous Compact and Keep
ti := newTreeIndex(zaptest.NewLogger(t))
for _, tt := range tests {
if tt.remove {
ti.Tombstone(tt.key, tt.rev)
} else {
ti.Put(tt.key, tt.rev)
}
}
for i := int64(1); i < maxRev; i++ { for i := int64(1); i < maxRev; i++ {
j := i - 1
if i >= int64(len(afterCompacts)) {
j = int64(len(afterCompacts)) - 1
}
am := ti.Compact(i) am := ti.Compact(i)
require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i)
keep := ti.Keep(i) keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) { require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i)
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
} nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex)
wti := &treeIndex{tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool { for k := range afterCompacts[j].keyIndexes {
return aki.Less(bki) ki := afterCompacts[j].keyIndexes[k]
})} nti.tree.ReplaceOrInsert(&ki)
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove {
wti.Tombstone(tt.key, tt.rev)
} else {
restore(wti, tt.key, tt.created, tt.rev, tt.ver)
}
}
}
if !ti.Equal(wti) {
t.Errorf("#%d: not equal ti", i)
} }
require.True(t, ti.Equal(nti), "#%d: not equal ti", i)
} }
// Once Compact and Keep // Once Compact and Keep
for i := int64(1); i < maxRev; i++ { for i := int64(1); i < maxRev; i++ {
ti := newTreeIndex(zaptest.NewLogger(t)) ti := buildTreeIndex()
for _, tt := range tests {
if tt.remove { j := i - 1
ti.Tombstone(tt.key, tt.rev) if i >= int64(len(afterCompacts)) {
} else { j = int64(len(afterCompacts)) - 1
ti.Put(tt.key, tt.rev)
}
} }
am := ti.Compact(i) am := ti.Compact(i)
require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i)
keep := ti.Keep(i) keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) { require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i)
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
} nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex)
wti := &treeIndex{tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool { for k := range afterCompacts[j].keyIndexes {
return aki.Less(bki) ki := afterCompacts[j].keyIndexes[k]
})} nti.tree.ReplaceOrInsert(&ki)
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove {
wti.Tombstone(tt.key, tt.rev)
} else {
restore(wti, tt.key, tt.created, tt.rev, tt.ver)
}
}
}
if !ti.Equal(wti) {
t.Errorf("#%d: not equal ti", i)
}
}
} }
func restore(ti *treeIndex, key []byte, created, modified Revision, ver int64) { require.True(t, ti.Equal(nti), "#%d: not equal ti", i)
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
okeyi, _ := ti.tree.Get(keyi)
if okeyi == nil {
keyi.restore(ti.lg, created, modified, ver)
ti.tree.ReplaceOrInsert(keyi)
return
} }
okeyi.put(ti.lg, modified.Main, modified.Sub)
} }

View File

@ -65,7 +65,8 @@ var (
// compact(5): // compact(5):
// generations: // generations:
// //
// {empty} -> key SHOULD be removed. // {empty}
// {5.0(t)}
// //
// compact(6): // compact(6):
// generations: // generations:
@ -202,8 +203,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []Revision {
} }
// compact compacts a keyIndex by removing the versions with smaller or equal // compact compacts a keyIndex by removing the versions with smaller or equal
// revision than the given atRev except the largest one (If the largest one is // revision than the given atRev except the largest one.
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed. // If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]struct{}) { func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]struct{}) {
if ki.isEmpty() { if ki.isEmpty() {
@ -221,11 +221,6 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]
if revIndex != -1 { if revIndex != -1 {
g.revs = g.revs[revIndex:] g.revs = g.revs[revIndex:]
} }
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
} }
// remove the previous generations. // remove the previous generations.
@ -241,7 +236,12 @@ func (ki *keyIndex) keep(atRev int64, available map[Revision]struct{}) {
genIdx, revIndex := ki.doCompact(atRev, available) genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx] g := &ki.generations[genIdx]
if !g.isEmpty() { if !g.isEmpty() {
// remove any tombstone // If the given `atRev` is a tombstone, we need to skip it.
//
// Note that this s different from the `compact` function which
// keeps tombstone in such case. We need to stay consistent with
// existing versions, ensuring they always generate the same hash
// values.
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex]) delete(available, g.revs[revIndex])
} }
@ -262,7 +262,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[Revision]struct{}) (gen
genIdx, g := 0, &ki.generations[0] genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev // find first generation includes atRev or created after atRev
for genIdx < len(ki.generations)-1 { for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].Main; tomb > atRev { if tomb := g.revs[len(g.revs)-1].Main; tomb >= atRev {
break break
} }
genIdx++ genIdx++

View File

@ -18,6 +18,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -308,12 +309,15 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
key: []byte("foo"), key: []byte("foo"),
modified: Revision{Main: 16}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 15, Sub: 1}, Revision{Main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 15, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[Revision]struct{}{}, map[Revision]struct{}{
Revision{Main: 6}: {},
},
}, },
{ {
7, 7,
@ -394,11 +398,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
key: []byte("foo"), key: []byte("foo"),
modified: Revision{Main: 16}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 15, Sub: 1}, Revision{Main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 15, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[Revision]struct{}{}, map[Revision]struct{}{
Revision{Main: 12}: {},
},
}, },
{ {
13, 13,
@ -442,6 +449,20 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
}, },
{ {
16, 16,
&keyIndex{
key: []byte("foo"),
modified: Revision{Main: 16},
generations: []generation{
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 16}}},
{},
},
},
map[Revision]struct{}{
Revision{Main: 16}: {},
},
},
{
17,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: Revision{Main: 16}, modified: Revision{Main: 16},
@ -453,18 +474,36 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
}, },
} }
isTombstoneRevFn := func(ki *keyIndex, rev int64) bool {
for i := 0; i < len(ki.generations)-1; i++ {
g := ki.generations[i]
if l := len(g.revs); l > 0 && g.revs[l-1].Main == rev {
return true
}
}
return false
}
// Continuous Compaction and finding Keep // Continuous Compaction and finding Keep
ki := newTestKeyIndex(zaptest.NewLogger(t)) ki := newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests { for i, tt := range tests {
isTombstone := isTombstoneRevFn(ki, tt.compact)
am := make(map[Revision]struct{}) am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki) kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am) ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) { if !reflect.DeepEqual(ki, kiclone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone) t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
} }
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) if isTombstone {
assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki)
} else {
assert.Equal(t, tt.wam, am,
"#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki)
} }
am = make(map[Revision]struct{}) am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am) ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) { if !reflect.DeepEqual(ki, tt.wki) {
@ -478,7 +517,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
// Jump Compaction and finding Keep // Jump Compaction and finding Keep
ki = newTestKeyIndex(zaptest.NewLogger(t)) ki = newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests { for i, tt := range tests {
if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { if !isTombstoneRevFn(ki, tt.compact) {
am := make(map[Revision]struct{}) am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki) kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am) ki.keep(tt.compact, am)
@ -508,9 +547,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(ki, kiClone) { if !reflect.DeepEqual(ki, kiClone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
} }
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) if isTombstoneRevFn(ki, tt.compact) {
assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki)
} else {
assert.Equal(t, tt.wam, am,
"#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki)
} }
am = make(map[Revision]struct{}) am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am) ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) { if !reflect.DeepEqual(ki, tt.wki) {