Merge pull request #16269 from dusk125/refactor-revision

Refactor common revision code to pkg
This commit is contained in:
Marek Siarkowicz 2023-10-10 10:16:58 +02:00 committed by GitHub
commit 16e19a9547
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 470 additions and 554 deletions

View File

@ -1,53 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package snapshot
import (
"encoding/binary"
)
type revision struct {
main int64
sub int64
}
// GreaterThan should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}
// bytesToRev should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
// revToBytes should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

View File

@ -156,8 +156,8 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("cannot write to bucket %s", herr.Error())
}
if iskeyb {
rev := bytesToRev(k)
ds.Revision = rev.main
rev := mvcc.BytesToRev(k)
ds.Revision = rev.Main
}
ds.TotalKey++
return nil
@ -346,42 +346,42 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
return err
}
latest = s.unsafeBumpRevision(tx, latest, int64(bumpAmount))
latest = s.unsafeBumpBucketsRevision(tx, latest, int64(bumpAmount))
s.unsafeMarkRevisionCompacted(tx, latest)
return nil
}
func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision {
func (s *v3Manager) unsafeBumpBucketsRevision(tx backend.UnsafeWriter, latest mvcc.Revision, amount int64) mvcc.Revision {
s.lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
zap.Int64("latest-revision", latest.Main),
zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount),
zap.Int64("new-latest-revision", latest.Main+amount),
)
latest.main += amount
latest.sub = 0
k := make([]byte, 17)
revToBytes(k, latest)
latest.Main += amount
latest.Sub = 0
k := mvcc.NewRevBytes()
k = mvcc.RevToBytes(latest, k)
tx.UnsafePut(schema.Key, k, []byte{})
return latest
}
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) {
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest mvcc.Revision) {
s.lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
zap.Int64("revision", latest.Main),
)
mvcc.UnsafeSetScheduledCompact(tx, latest.main)
mvcc.UnsafeSetScheduledCompact(tx, latest.Main)
}
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) {
var latest revision
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (mvcc.Revision, error) {
var latest mvcc.Revision
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k)
rev := mvcc.BytesToRev(k)
if rev.GreaterThan(latest) {
latest = rev

View File

@ -30,7 +30,7 @@ const (
hashStorageMaxSize = 10
)
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[Revision]struct{}) (KeyValueHash, error) {
h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v)
@ -43,10 +43,10 @@ type kvHasher struct {
hash hash.Hash32
compactRevision int64
revision int64
keep map[revision]struct{}
keep map[Revision]struct{}
}
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
func newKVHasher(compactRev, rev int64, keep map[Revision]struct{}) kvHasher {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(schema.Key.Name())
return kvHasher{
@ -58,12 +58,12 @@ func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
}
func (h *kvHasher) WriteKeyValue(k, v []byte) {
kr := bytesToRev(k)
upper := revision{main: h.revision + 1}
kr := BytesToRev(k)
upper := Revision{Main: h.revision + 1}
if !upper.GreaterThan(kr) {
return
}
lower := revision{main: h.compactRevision + 1}
lower := Revision{Main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(h.keep) > 0 {

View File

@ -22,14 +22,14 @@ import (
)
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
Get(key []byte, atRev int64) (rev, created Revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []Revision)
Revisions(key, end []byte, atRev int64, limit int) ([]Revision, int)
CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Put(key []byte, rev Revision)
Tombstone(key []byte, rev Revision) error
Compact(rev int64) map[Revision]struct{}
Keep(rev int64) map[Revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
@ -51,30 +51,30 @@ func newTreeIndex(lg *zap.Logger) index {
}
}
func (ti *treeIndex) Put(key []byte, rev revision) {
func (ti *treeIndex) Put(key []byte, rev Revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.main, rev.sub)
keyi.put(ti.lg, rev.Main, rev.Sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.main, rev.sub)
okeyi.put(ti.lg, rev.Main, rev.Sub)
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
ti.RLock()
defer ti.RUnlock()
return ti.unsafeGet(key, atRev)
}
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
}
@ -109,7 +109,7 @@ func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
// Revisions returns limited number of revisions from key(included) to end(excluded)
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {
ti.RLock()
defer ti.RUnlock()
@ -118,7 +118,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
if err != nil {
return nil, 0
}
return []revision{rev}, 1
return []Revision{rev}, 1
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
@ -155,7 +155,7 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
return total
}
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []Revision) {
ti.RLock()
defer ti.RUnlock()
@ -164,7 +164,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
if err != nil {
return nil, nil
}
return [][]byte{key}, []revision{rev}
return [][]byte{key}, []Revision{rev}
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
@ -176,7 +176,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return keys, revs
}
func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
func (ti *treeIndex) Tombstone(key []byte, rev Revision) error {
keyi := &keyIndex{key: key}
ti.Lock()
@ -186,11 +186,11 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
return ErrRevisionNotFound
}
return ki.tombstone(ti.lg, rev.main, rev.sub)
return ki.tombstone(ti.lg, rev.Main, rev.Sub)
}
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
@ -214,8 +214,8 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
}
// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
func (ti *treeIndex) Keep(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool {

View File

@ -33,7 +33,7 @@ func benchmarkIndexCompact(b *testing.B, size int) {
bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
@ -49,7 +49,7 @@ func BenchmarkIndexPut(b *testing.B) {
keys := createBytesSlice(bytesN, b.N)
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
}
}
@ -60,7 +60,7 @@ func BenchmarkIndexGet(b *testing.B) {
bytesN := 64
keys := createBytesSlice(bytesN, b.N)
for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {

View File

@ -24,25 +24,25 @@ import (
func TestIndexGet(t *testing.T) {
ti := newTreeIndex(zaptest.NewLogger(t))
ti.Put([]byte("foo"), revision{main: 2})
ti.Put([]byte("foo"), revision{main: 4})
ti.Tombstone([]byte("foo"), revision{main: 6})
ti.Put([]byte("foo"), Revision{Main: 2})
ti.Put([]byte("foo"), Revision{Main: 4})
ti.Tombstone([]byte("foo"), Revision{Main: 6})
tests := []struct {
rev int64
wrev revision
wcreated revision
wrev Revision
wcreated Revision
wver int64
werr error
}{
{0, revision{}, revision{}, 0, ErrRevisionNotFound},
{1, revision{}, revision{}, 0, ErrRevisionNotFound},
{2, revision{main: 2}, revision{main: 2}, 1, nil},
{3, revision{main: 2}, revision{main: 2}, 1, nil},
{4, revision{main: 4}, revision{main: 2}, 2, nil},
{5, revision{main: 4}, revision{main: 2}, 2, nil},
{6, revision{}, revision{}, 0, ErrRevisionNotFound},
{0, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{1, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{2, Revision{Main: 2}, Revision{Main: 2}, 1, nil},
{3, Revision{Main: 2}, Revision{Main: 2}, 1, nil},
{4, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{5, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{6, Revision{}, Revision{}, 0, ErrRevisionNotFound},
}
for i, tt := range tests {
rev, created, ver, err := ti.Get([]byte("foo"), tt.rev)
@ -63,7 +63,7 @@ func TestIndexGet(t *testing.T) {
func TestIndexRange(t *testing.T) {
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
allRevs := []revision{{main: 1}, {main: 2}, {main: 3}}
allRevs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}
ti := newTreeIndex(zaptest.NewLogger(t))
for i := range allKeys {
@ -74,7 +74,7 @@ func TestIndexRange(t *testing.T) {
tests := []struct {
key, end []byte
wkeys [][]byte
wrevs []revision
wrevs []Revision
}{
// single key that not found
{
@ -122,9 +122,9 @@ func TestIndexRange(t *testing.T) {
func TestIndexTombstone(t *testing.T) {
ti := newTreeIndex(zaptest.NewLogger(t))
ti.Put([]byte("foo"), revision{main: 1})
ti.Put([]byte("foo"), Revision{Main: 1})
err := ti.Tombstone([]byte("foo"), revision{main: 2})
err := ti.Tombstone([]byte("foo"), Revision{Main: 2})
if err != nil {
t.Errorf("tombstone error = %v, want nil", err)
}
@ -133,7 +133,7 @@ func TestIndexTombstone(t *testing.T) {
if err != ErrRevisionNotFound {
t.Errorf("get error = %v, want ErrRevisionNotFound", err)
}
err = ti.Tombstone([]byte("foo"), revision{main: 3})
err = ti.Tombstone([]byte("foo"), Revision{Main: 3})
if err != ErrRevisionNotFound {
t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound)
}
@ -141,7 +141,7 @@ func TestIndexTombstone(t *testing.T) {
func TestIndexRevision(t *testing.T) {
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")}
allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}}
allRevs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 5}, Revision{Main: 6}}
ti := newTreeIndex(zaptest.NewLogger(t))
for i := range allKeys {
@ -152,7 +152,7 @@ func TestIndexRevision(t *testing.T) {
key, end []byte
atRev int64
limit int
wrevs []revision
wrevs []Revision
wcounts int
}{
// single key that not found
@ -161,23 +161,23 @@ func TestIndexRevision(t *testing.T) {
},
// single key that found
{
[]byte("foo"), nil, 6, 0, []revision{{main: 6}}, 1,
[]byte("foo"), nil, 6, 0, []Revision{Revision{Main: 6}}, 1,
},
// various range keys, fixed atRev, unlimited
{
[]byte("foo"), []byte("foo1"), 6, 0, []revision{{main: 6}}, 1,
[]byte("foo"), []byte("foo1"), 6, 0, []Revision{Revision{Main: 6}}, 1,
},
{
[]byte("foo"), []byte("foo2"), 6, 0, []revision{{main: 6}, {main: 5}}, 2,
[]byte("foo"), []byte("foo2"), 6, 0, []Revision{Revision{Main: 6}, Revision{Main: 5}}, 2,
},
{
[]byte("foo"), []byte("fop"), 6, 0, []revision{{main: 6}, {main: 5}, {main: 4}}, 3,
[]byte("foo"), []byte("fop"), 6, 0, []Revision{Revision{Main: 6}, Revision{Main: 5}, Revision{Main: 4}}, 3,
},
{
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2,
[]byte("foo1"), []byte("fop"), 6, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
},
{
[]byte("foo2"), []byte("fop"), 6, 0, []revision{{main: 4}}, 1,
[]byte("foo2"), []byte("fop"), 6, 0, []Revision{Revision{Main: 4}}, 1,
},
{
[]byte("foo3"), []byte("fop"), 6, 0, nil, 0,
@ -187,38 +187,38 @@ func TestIndexRevision(t *testing.T) {
[]byte("foo1"), []byte("fop"), 1, 0, nil, 0,
},
{
[]byte("foo1"), []byte("fop"), 2, 0, []revision{{main: 2}}, 1,
[]byte("foo1"), []byte("fop"), 2, 0, []Revision{Revision{Main: 2}}, 1,
},
{
[]byte("foo1"), []byte("fop"), 3, 0, []revision{{main: 2}, {main: 3}}, 2,
[]byte("foo1"), []byte("fop"), 3, 0, []Revision{Revision{Main: 2}, Revision{Main: 3}}, 2,
},
{
[]byte("foo1"), []byte("fop"), 4, 0, []revision{{main: 2}, {main: 4}}, 2,
[]byte("foo1"), []byte("fop"), 4, 0, []Revision{Revision{Main: 2}, Revision{Main: 4}}, 2,
},
{
[]byte("foo1"), []byte("fop"), 5, 0, []revision{{main: 5}, {main: 4}}, 2,
[]byte("foo1"), []byte("fop"), 5, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
},
{
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2,
[]byte("foo1"), []byte("fop"), 6, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
},
// fixed range keys, fixed atRev, various limit
{
[]byte("foo"), []byte("fop"), 6, 1, []revision{{main: 6}}, 3,
[]byte("foo"), []byte("fop"), 6, 1, []Revision{Revision{Main: 6}}, 3,
},
{
[]byte("foo"), []byte("fop"), 6, 2, []revision{{main: 6}, {main: 5}}, 3,
[]byte("foo"), []byte("fop"), 6, 2, []Revision{Revision{Main: 6}, Revision{Main: 5}}, 3,
},
{
[]byte("foo"), []byte("fop"), 6, 3, []revision{{main: 6}, {main: 5}, {main: 4}}, 3,
[]byte("foo"), []byte("fop"), 6, 3, []Revision{Revision{Main: 6}, Revision{Main: 5}, Revision{Main: 4}}, 3,
},
{
[]byte("foo"), []byte("fop"), 3, 1, []revision{{main: 1}}, 3,
[]byte("foo"), []byte("fop"), 3, 1, []Revision{Revision{Main: 1}}, 3,
},
{
[]byte("foo"), []byte("fop"), 3, 2, []revision{{main: 1}, {main: 2}}, 3,
[]byte("foo"), []byte("fop"), 3, 2, []Revision{Revision{Main: 1}, Revision{Main: 2}}, 3,
},
{
[]byte("foo"), []byte("fop"), 3, 3, []revision{{main: 1}, {main: 2}, {main: 3}}, 3,
[]byte("foo"), []byte("fop"), 3, 3, []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}, 3,
},
}
for i, tt := range tests {
@ -238,21 +238,21 @@ func TestIndexCompactAndKeep(t *testing.T) {
tests := []struct {
key []byte
remove bool
rev revision
created revision
rev Revision
created Revision
ver int64
}{
{[]byte("foo"), false, revision{main: 1}, revision{main: 1}, 1},
{[]byte("foo1"), false, revision{main: 2}, revision{main: 2}, 1},
{[]byte("foo2"), false, revision{main: 3}, revision{main: 3}, 1},
{[]byte("foo2"), false, revision{main: 4}, revision{main: 3}, 2},
{[]byte("foo"), false, revision{main: 5}, revision{main: 1}, 2},
{[]byte("foo1"), false, revision{main: 6}, revision{main: 2}, 2},
{[]byte("foo1"), true, revision{main: 7}, revision{}, 0},
{[]byte("foo2"), true, revision{main: 8}, revision{}, 0},
{[]byte("foo"), true, revision{main: 9}, revision{}, 0},
{[]byte("foo"), false, revision{10, 0}, revision{10, 0}, 1},
{[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1},
{[]byte("foo"), false, Revision{Main: 1}, Revision{Main: 1}, 1},
{[]byte("foo1"), false, Revision{Main: 2}, Revision{Main: 2}, 1},
{[]byte("foo2"), false, Revision{Main: 3}, Revision{Main: 3}, 1},
{[]byte("foo2"), false, Revision{Main: 4}, Revision{Main: 3}, 2},
{[]byte("foo"), false, Revision{Main: 5}, Revision{Main: 1}, 2},
{[]byte("foo1"), false, Revision{Main: 6}, Revision{Main: 2}, 2},
{[]byte("foo1"), true, Revision{Main: 7}, Revision{}, 0},
{[]byte("foo2"), true, Revision{Main: 8}, Revision{}, 0},
{[]byte("foo"), true, Revision{Main: 9}, Revision{}, 0},
{[]byte("foo"), false, Revision{Main: 10}, Revision{Main: 10}, 1},
{[]byte("foo1"), false, Revision{Main: 10, Sub: 1}, Revision{Main: 10, Sub: 1}, 1},
}
// Continuous Compact and Keep
@ -274,7 +274,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
return aki.Less(bki)
})}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove {
wti.Tombstone(tt.key, tt.rev)
} else {
@ -306,7 +306,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
return aki.Less(bki)
})}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove {
wti.Tombstone(tt.key, tt.rev)
} else {
@ -320,7 +320,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
}
}
func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
func restore(ti *treeIndex, key []byte, created, modified Revision, ver int64) {
keyi := &keyIndex{key: key}
ti.Lock()
@ -331,5 +331,5 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, modified.main, modified.sub)
okeyi.put(ti.lg, modified.Main, modified.Sub)
}

View File

@ -73,21 +73,21 @@ var (
// {empty} -> key SHOULD be removed.
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
modified Revision // the main rev of the last modification
generations []generation
}
// put puts a revision to the keyIndex.
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
rev := Revision{Main: main, Sub: sub}
if !rev.GreaterThan(ki.modified) {
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
zap.Int64("given-revision-main", rev.Main),
zap.Int64("given-revision-sub", rev.Sub),
zap.Int64("modified-revision-main", ki.modified.Main),
zap.Int64("modified-revision-sub", ki.modified.Sub),
)
}
if len(ki.generations) == 0 {
@ -103,7 +103,7 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
ki.modified = rev
}
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int64) {
if len(ki.generations) != 0 {
lg.Panic(
"'restore' got an unexpected non-empty generations",
@ -112,7 +112,7 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6
}
ki.modified = modified
g := generation{created: created, ver: ver, revs: []revision{modified}}
g := generation{created: created, ver: ver, revs: []Revision{modified}}
ki.generations = append(ki.generations, g)
keysGauge.Inc()
}
@ -138,7 +138,7 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be smaller than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) {
if ki.isEmpty() {
lg.Panic(
"'get' got an unexpected empty keyIndex",
@ -147,28 +147,28 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
n := g.walk(func(rev revision) bool { return rev.main > atRev })
n := g.walk(func(rev Revision) bool { return rev.Main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
// since returns revisions since the given rev. Only the revision with the
// largest sub revision will be returned if multiple revisions have the same
// main revision.
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []Revision {
if ki.isEmpty() {
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
since := revision{rev, 0}
since := Revision{Main: rev}
var gi int
// find the generations to start checking
for gi = len(ki.generations) - 1; gi > 0; gi-- {
@ -181,21 +181,21 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
}
}
var revs []revision
var revs []Revision
var last int64
for ; gi < len(ki.generations); gi++ {
for _, r := range ki.generations[gi].revs {
if since.GreaterThan(r) {
continue
}
if r.main == last {
if r.Main == last {
// replace the revision with a new one that has higher sub value,
// because the original one should not be seen by external
revs[len(revs)-1] = r
continue
}
revs = append(revs, r)
last = r.main
last = r.Main
}
}
return revs
@ -205,7 +205,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
// revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]struct{}) {
if ki.isEmpty() {
lg.Panic(
"'compact' got an unexpected empty keyIndex",
@ -233,7 +233,7 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]
}
// keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
func (ki *keyIndex) keep(atRev int64, available map[Revision]struct{}) {
if ki.isEmpty() {
return
}
@ -248,11 +248,11 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
}
}
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
func (ki *keyIndex) doCompact(atRev int64, available map[Revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision smaller or equal to "atRev",
// and add the revision to the available map
f := func(rev revision) bool {
if rev.main <= atRev {
f := func(rev Revision) bool {
if rev.Main <= atRev {
available[rev] = struct{}{}
return false
}
@ -262,7 +262,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (gen
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
if tomb := g.revs[len(g.revs)-1].Main; tomb > atRev {
break
}
genIdx++
@ -292,11 +292,11 @@ func (ki *keyIndex) findGeneration(rev int64) *generation {
}
g := ki.generations[cg]
if cg != lastg {
if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev {
return nil
}
}
if g.revs[0].main <= rev {
if g.revs[0].Main <= rev {
return &ki.generations[cg]
}
cg--
@ -338,8 +338,8 @@ func (ki *keyIndex) String() string {
// generation contains multiple revisions of a key.
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
created Revision // when the generation is created (put in first revision).
revs []Revision
}
func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
@ -349,7 +349,7 @@ func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
// walk returns until: 1. it finishes walking all pairs 2. the function returns false.
// walk returns the position at where it stopped. If it stopped after
// finishing walking, -1 will be returned.
func (g *generation) walk(f func(rev revision) bool) int {
func (g *generation) walk(f func(rev Revision) bool) int {
l := len(g.revs)
for i := range g.revs {
ok := f(g.revs[l-i-1])

View File

@ -31,43 +31,43 @@ func TestKeyIndexGet(t *testing.T) {
// {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
ki := newTestKeyIndex(zaptest.NewLogger(t))
ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{}))
ki.compact(zaptest.NewLogger(t), 4, make(map[Revision]struct{}))
tests := []struct {
rev int64
wmod revision
wcreat revision
wmod Revision
wcreat Revision
wver int64
werr error
}{
{17, revision{}, revision{}, 0, ErrRevisionNotFound},
{16, revision{}, revision{}, 0, ErrRevisionNotFound},
{17, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{16, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 3
{15, revision{14, 1}, revision{14, 0}, 2, nil},
{14, revision{14, 1}, revision{14, 0}, 2, nil},
{15, Revision{Main: 14, Sub: 1}, Revision{Main: 14}, 2, nil},
{14, Revision{Main: 14, Sub: 1}, Revision{Main: 14}, 2, nil},
{13, revision{}, revision{}, 0, ErrRevisionNotFound},
{12, revision{}, revision{}, 0, ErrRevisionNotFound},
{13, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{12, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 2
{11, revision{10, 0}, revision{8, 0}, 2, nil},
{10, revision{10, 0}, revision{8, 0}, 2, nil},
{9, revision{8, 0}, revision{8, 0}, 1, nil},
{8, revision{8, 0}, revision{8, 0}, 1, nil},
{11, Revision{Main: 10}, Revision{Main: 8}, 2, nil},
{10, Revision{Main: 10}, Revision{Main: 8}, 2, nil},
{9, Revision{Main: 8}, Revision{Main: 8}, 1, nil},
{8, Revision{Main: 8}, Revision{Main: 8}, 1, nil},
{7, revision{}, revision{}, 0, ErrRevisionNotFound},
{6, revision{}, revision{}, 0, ErrRevisionNotFound},
{7, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{6, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 1
{5, revision{4, 0}, revision{2, 0}, 2, nil},
{4, revision{4, 0}, revision{2, 0}, 2, nil},
{5, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{4, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{3, revision{}, revision{}, 0, ErrRevisionNotFound},
{2, revision{}, revision{}, 0, ErrRevisionNotFound},
{1, revision{}, revision{}, 0, ErrRevisionNotFound},
{0, revision{}, revision{}, 0, ErrRevisionNotFound},
{3, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{2, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{1, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{0, Revision{}, Revision{}, 0, ErrRevisionNotFound},
}
for i, tt := range tests {
@ -89,13 +89,21 @@ func TestKeyIndexGet(t *testing.T) {
func TestKeyIndexSince(t *testing.T) {
ki := newTestKeyIndex(zaptest.NewLogger(t))
ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{}))
ki.compact(zaptest.NewLogger(t), 4, make(map[Revision]struct{}))
allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}}
allRevs := []Revision{
Revision{Main: 4},
Revision{Main: 6},
Revision{Main: 8},
Revision{Main: 10},
Revision{Main: 12},
Revision{Main: 14, Sub: 1},
Revision{Main: 16},
}
tests := []struct {
rev int64
wrevs []revision
wrevs []Revision
}{
{17, nil},
{16, allRevs[6:]},
@ -131,8 +139,8 @@ func TestKeyIndexPut(t *testing.T) {
wki := &keyIndex{
key: []byte("foo"),
modified: revision{5, 0},
generations: []generation{{created: revision{5, 0}, ver: 1, revs: []revision{{main: 5}}}},
modified: Revision{Main: 5},
generations: []generation{{created: Revision{Main: 5}, ver: 1, revs: []Revision{Revision{Main: 5}}}},
}
if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki)
@ -142,8 +150,8 @@ func TestKeyIndexPut(t *testing.T) {
wki = &keyIndex{
key: []byte("foo"),
modified: revision{7, 0},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}},
modified: Revision{Main: 7},
generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}}},
}
if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki)
@ -152,12 +160,12 @@ func TestKeyIndexPut(t *testing.T) {
func TestKeyIndexRestore(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.restore(zaptest.NewLogger(t), revision{5, 0}, revision{7, 0}, 2)
ki.restore(zaptest.NewLogger(t), Revision{Main: 5}, Revision{Main: 7}, 2)
wki := &keyIndex{
key: []byte("foo"),
modified: revision{7, 0},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 7}}}},
modified: Revision{Main: 7},
generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 7}}}},
}
if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki)
@ -175,8 +183,8 @@ func TestKeyIndexTombstone(t *testing.T) {
wki := &keyIndex{
key: []byte("foo"),
modified: revision{7, 0},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}, {}},
modified: Revision{Main: 7},
generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}}, {}},
}
if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki)
@ -191,10 +199,10 @@ func TestKeyIndexTombstone(t *testing.T) {
wki = &keyIndex{
key: []byte("foo"),
modified: revision{15, 0},
modified: Revision{Main: 15},
generations: []generation{
{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 9}, {main: 15}}},
{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 9}, Revision{Main: 15}}},
{},
},
}
@ -213,241 +221,241 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
compact int64
wki *keyIndex
wam map[revision]struct{}
wam map[Revision]struct{}
}{
{
1,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
{
2,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 2}: {},
map[Revision]struct{}{
Revision{Main: 2}: {},
},
},
{
3,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 2}: {},
map[Revision]struct{}{
Revision{Main: 2}: {},
},
},
{
4,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 4}, Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 4}: {},
map[Revision]struct{}{
Revision{Main: 4}: {},
},
},
{
5,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 4}, Revision{Main: 6}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 4}: {},
map[Revision]struct{}{
Revision{Main: 4}: {},
},
},
{
6,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
{
7,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
{
8,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 8}: {},
map[Revision]struct{}{
Revision{Main: 8}: {},
},
},
{
9,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 8}: {},
map[Revision]struct{}{
Revision{Main: 8}: {},
},
},
{
10,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 10}: {},
map[Revision]struct{}{
Revision{Main: 10}: {},
},
},
{
11,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 10}, Revision{Main: 12}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 10}: {},
map[Revision]struct{}{
Revision{Main: 10}: {},
},
},
{
12,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
{
13,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
{
14,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 14, sub: 1}: {},
map[Revision]struct{}{
Revision{Main: 14, Sub: 1}: {},
},
},
{
15,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}},
{created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{},
},
},
map[revision]struct{}{
{main: 14, sub: 1}: {},
map[Revision]struct{}{
Revision{Main: 14, Sub: 1}: {},
},
},
{
16,
&keyIndex{
key: []byte("foo"),
modified: revision{16, 0},
modified: Revision{Main: 16},
generations: []generation{
{},
},
},
map[revision]struct{}{},
map[Revision]struct{}{},
},
}
// Continuous Compaction and finding Keep
ki := newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests {
am := make(map[revision]struct{})
am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
@ -456,7 +464,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -470,7 +478,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
ki = newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests {
if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) {
am := make(map[revision]struct{})
am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
@ -479,7 +487,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -494,7 +502,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
// Once Compaction and finding Keep
for i, tt := range tests {
ki := newTestKeyIndex(zaptest.NewLogger(t))
am := make(map[revision]struct{})
am := make(map[Revision]struct{})
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiClone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
@ -502,7 +510,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -525,7 +533,7 @@ func cloneGeneration(g *generation) *generation {
if g.revs == nil {
return &generation{g.ver, g.created, nil}
}
tmp := make([]revision, len(g.revs))
tmp := make([]Revision, len(g.revs))
copy(tmp, g.revs)
return &generation{g.ver, g.created, tmp}
}
@ -536,18 +544,18 @@ func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.put(zaptest.NewLogger(t), 1, 0)
ki.put(zaptest.NewLogger(t), 2, 0)
am := make(map[revision]struct{})
am := make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), 3, am)
wki := &keyIndex{
key: []byte("foo"),
modified: revision{2, 0},
modified: Revision{Main: 2},
generations: []generation{
{created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}},
{created: Revision{Main: 1}, ver: 2, revs: []Revision{Revision{Main: 2}}},
},
}
wam := map[revision]struct{}{
{main: 2}: {},
wam := map[Revision]struct{}{
Revision{Main: 2}: {},
}
if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki)
@ -572,9 +580,9 @@ func TestKeyIndexIsEmpty(t *testing.T) {
{
&keyIndex{
key: []byte("foo"),
modified: revision{2, 0},
modified: Revision{Main: 2},
generations: []generation{
{created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}},
{created: Revision{Main: 1}, ver: 2, revs: []Revision{Revision{Main: 2}}},
},
},
false,
@ -644,7 +652,7 @@ func TestGenerationIsEmpty(t *testing.T) {
}{
{nil, true},
{&generation{}, true},
{&generation{revs: []revision{{main: 1}}}, false},
{&generation{revs: []Revision{Revision{Main: 1}}}, false},
}
for i, tt := range tests {
g := tt.g.isEmpty()
@ -657,19 +665,19 @@ func TestGenerationIsEmpty(t *testing.T) {
func TestGenerationWalk(t *testing.T) {
g := &generation{
ver: 3,
created: revision{2, 0},
revs: []revision{{main: 2}, {main: 4}, {main: 6}},
created: Revision{Main: 2},
revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}},
}
tests := []struct {
f func(rev revision) bool
f func(rev Revision) bool
wi int
}{
{func(rev revision) bool { return rev.main >= 7 }, 2},
{func(rev revision) bool { return rev.main >= 6 }, 1},
{func(rev revision) bool { return rev.main >= 5 }, 1},
{func(rev revision) bool { return rev.main >= 4 }, 0},
{func(rev revision) bool { return rev.main >= 3 }, 0},
{func(rev revision) bool { return rev.main >= 2 }, -1},
{func(rev Revision) bool { return rev.Main >= 7 }, 2},
{func(rev Revision) bool { return rev.Main >= 6 }, 1},
{func(rev Revision) bool { return rev.Main >= 5 }, 1},
{func(rev Revision) bool { return rev.Main >= 4 }, 0},
{func(rev Revision) bool { return rev.Main >= 3 }, 0},
{func(rev Revision) bool { return rev.Main >= 2 }, -1},
}
for i, tt := range tests {
idx := g.walk(tt.f)

View File

@ -37,15 +37,6 @@ var (
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
)
const (
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
)
var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
var minimumBatchInterval = 10 * time.Millisecond
@ -320,9 +311,9 @@ func (s *store) Restore(b backend.Backend) error {
func (s *store) restore() error {
s.setupMetricsReporter()
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
min, max := NewRevBytes(), NewRevBytes()
min = RevToBytes(Revision{Main: 1}, min)
max = RevToBytes(Revision{Main: math.MaxInt64, Sub: math.MaxInt64}, max)
keyToLease := make(map[string]lease.LeaseID)
@ -359,9 +350,9 @@ func (s *store) restore() error {
break
}
// next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
newMin := BytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.Sub++
min = RevToBytes(newMin, min)
}
close(rkvc)
@ -448,18 +439,18 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
ok = true
}
}
rev := bytesToRev(rkv.key)
currentRev = rev.main
rev := BytesToRev(rkv.key)
currentRev = rev.Main
if ok {
if isTombstone(rkv.key) {
if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
if err := ki.tombstone(lg, rev.Main, rev.Sub); err != nil {
lg.Warn("tombstone encountered error", zap.Error(err))
}
continue
}
ki.put(lg, rev.main, rev.sub)
ki.put(lg, rev.Main, rev.Sub)
} else if !isTombstone(rkv.key) {
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version)
idx.Insert(ki)
kiCache[rkv.kstr] = ki
}
@ -519,23 +510,6 @@ func (s *store) setupMetricsReporter() {
reportCompactRevMu.Unlock()
}
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
if len(b) != revBytesLen {
lg.Panic(
"cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)),
)
}
return append(b, markTombstone)
}
// isTombstone checks whether the revision bytes is a tombstone.
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}
func (s *store) HashStorage() HashStorage {
return s.hashes
}

View File

@ -44,7 +44,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8)
for {
var rev revision
var rev Revision
start := time.Now()
@ -52,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
tx.LockOutsideApply()
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for i := range keys {
rev = bytesToRev(keys[i])
rev = BytesToRev(keys[i])
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(schema.Key, keys[i])
keyCompactions++
@ -77,7 +77,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
tx.Unlock()
// update last
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)
// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
// gofail: var compactBeforeCommitBatch struct{}
s.b.ForceCommit()

View File

@ -29,12 +29,12 @@ import (
)
func TestScheduleCompaction(t *testing.T) {
revs := []revision{{1, 0}, {2, 0}, {3, 0}}
revs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}
tests := []struct {
rev int64
keep map[revision]struct{}
wrevs []revision
keep map[Revision]struct{}
wrevs []Revision
}{
// compact at 1 and discard all history
{
@ -51,17 +51,17 @@ func TestScheduleCompaction(t *testing.T) {
// compact at 1 and keeps history one step earlier
{
1,
map[revision]struct{}{
{main: 1}: {},
map[Revision]struct{}{
{Main: 1}: {},
},
revs,
},
// compact at 1 and keeps history two steps earlier
{
3,
map[revision]struct{}{
{main: 2}: {},
{main: 3}: {},
map[Revision]struct{}{
{Main: 2}: {},
{Main: 3}: {},
},
revs[1:],
},
@ -76,9 +76,9 @@ func TestScheduleCompaction(t *testing.T) {
tx := s.b.BatchTx()
tx.Lock()
ibytes := newRevBytes()
ibytes := NewRevBytes()
for _, rev := range revs {
revToBytes(rev, ibytes)
ibytes = RevToBytes(rev, ibytes)
tx.UnsafePut(schema.Key, ibytes, []byte("bar"))
}
tx.Unlock()
@ -90,7 +90,7 @@ func TestScheduleCompaction(t *testing.T) {
tx.Lock()
for _, rev := range tt.wrevs {
revToBytes(rev, ibytes)
ibytes = RevToBytes(rev, ibytes)
keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0)
if len(keys) != 1 {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))

View File

@ -70,22 +70,22 @@ func TestStorePut(t *testing.T) {
}
tests := []struct {
rev revision
rev Revision
r indexGetResp
rr *rangeResp
wrev revision
wrev Revision
wkey []byte
wkv mvccpb.KeyValue
wputrev revision
wputrev Revision
}{
{
revision{1, 0},
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
Revision{Main: 1},
indexGetResp{Revision{}, Revision{}, 0, ErrRevisionNotFound},
nil,
revision{2, 0},
newTestKeyBytes(lg, revision{2, 0}, false),
Revision{Main: 2},
newTestRevBytes(Revision{Main: 2}),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -94,15 +94,15 @@ func TestStorePut(t *testing.T) {
Version: 1,
Lease: 1,
},
revision{2, 0},
Revision{Main: 2},
},
{
revision{1, 1},
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
&rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}},
Revision{Main: 1, Sub: 1},
indexGetResp{Revision{Main: 2}, Revision{Main: 2}, 1, nil},
&rangeResp{[][]byte{newTestRevBytes(Revision{Main: 2, Sub: 1})}, [][]byte{kvb}},
revision{2, 0},
newTestKeyBytes(lg, revision{2, 0}, false),
Revision{Main: 2},
newTestRevBytes(Revision{Main: 2}),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -111,15 +111,15 @@ func TestStorePut(t *testing.T) {
Version: 2,
Lease: 2,
},
revision{2, 0},
Revision{Main: 2},
},
{
revision{2, 0},
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
&rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}},
Revision{Main: 2},
indexGetResp{Revision{Main: 2, Sub: 1}, Revision{Main: 2}, 2, nil},
&rangeResp{[][]byte{newTestRevBytes(Revision{Main: 2, Sub: 1})}, [][]byte{kvb}},
revision{3, 0},
newTestKeyBytes(lg, revision{3, 0}, false),
Revision{Main: 3},
newTestRevBytes(Revision{Main: 3}),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -128,7 +128,7 @@ func TestStorePut(t *testing.T) {
Version: 3,
Lease: 3,
},
revision{3, 0},
Revision{Main: 3},
},
}
for i, tt := range tests {
@ -136,7 +136,7 @@ func TestStorePut(t *testing.T) {
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main
s.currentRev = tt.rev.Main
fi.indexGetRespc <- tt.r
if tt.rr != nil {
b.tx.rangeRespc <- *tt.rr
@ -163,13 +163,13 @@ func TestStorePut(t *testing.T) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{Name: "get", Params: []any{[]byte("foo"), tt.wputrev.main}},
{Name: "get", Params: []any{[]byte("foo"), tt.wputrev.Main}},
{Name: "put", Params: []any{[]byte("foo"), tt.wputrev}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev.main {
if s.currentRev != tt.wrev.Main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
@ -179,7 +179,7 @@ func TestStorePut(t *testing.T) {
func TestStoreRange(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false)
key := newTestRevBytes(Revision{Main: 2})
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -198,11 +198,11 @@ func TestStoreRange(t *testing.T) {
r rangeResp
}{
{
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
indexRangeResp{[][]byte{[]byte("foo")}, []Revision{Revision{Main: 2}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
{
indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []Revision{Revision{Main: 2}, Revision{Main: 3}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}
@ -228,8 +228,8 @@ func TestStoreRange(t *testing.T) {
t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
}
wstart := newRevBytes()
revToBytes(tt.idxr.revs[0], wstart)
wstart := NewRevBytes()
wstart = RevToBytes(tt.idxr.revs[0], wstart)
wact := []testutil.Action{
{Name: "range", Params: []any{schema.Key, wstart, []byte(nil), int64(0)}},
}
@ -252,7 +252,7 @@ func TestStoreRange(t *testing.T) {
func TestStoreDeleteRange(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false)
key := newTestRevBytes(Revision{Main: 2})
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -266,24 +266,24 @@ func TestStoreDeleteRange(t *testing.T) {
}
tests := []struct {
rev revision
rev Revision
r indexRangeResp
rr rangeResp
wkey []byte
wrev revision
wrev Revision
wrrev int64
wdelrev revision
wdelrev Revision
}{
{
revision{2, 0},
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
Revision{Main: 2},
indexRangeResp{[][]byte{[]byte("foo")}, []Revision{{Main: 2}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
newTestKeyBytes(lg, revision{3, 0}, true),
revision{3, 0},
newTestBucketKeyBytes(newBucketKey(3, 0, true)),
Revision{Main: 3},
2,
revision{3, 0},
Revision{Main: 3},
},
}
for i, tt := range tests {
@ -291,7 +291,7 @@ func TestStoreDeleteRange(t *testing.T) {
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main
s.currentRev = tt.rev.Main
fi.indexRangeRespc <- tt.r
b.tx.rangeRespc <- tt.rr
@ -319,7 +319,7 @@ func TestStoreDeleteRange(t *testing.T) {
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev.main {
if s.currentRev != tt.wrev.Main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
s.Close()
@ -334,9 +334,9 @@ func TestStoreCompact(t *testing.T) {
fi := s.kvindex.(*fakeIndex)
s.currentRev = 3
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
fi.indexCompactRespc <- map[Revision]struct{}{Revision{Main: 1}: {}}
key1 := newTestRevBytes(Revision{Main: 1})
key2 := newTestRevBytes(Revision{Main: 2})
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
@ -352,10 +352,10 @@ func TestStoreCompact(t *testing.T) {
wact := []testutil.Action{
{Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(Revision{Main: 3})}},
{Name: "range", Params: []any{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []any{schema.Key, key2}},
{Name: "put", Params: []any{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []any{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(Revision{Main: 3})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -375,7 +375,7 @@ func TestStoreRestore(t *testing.T) {
fi := s.kvindex.(*fakeIndex)
defer s.Close()
putkey := newTestKeyBytes(lg, revision{3, 0}, false)
putkey := newTestRevBytes(Revision{Main: 3})
putkv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
@ -387,7 +387,7 @@ func TestStoreRestore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
delkey := newTestKeyBytes(lg, revision{5, 0}, true)
delkey := newTestBucketKeyBytes(newBucketKey(5, 0, true))
delkv := mvccpb.KeyValue{
Key: []byte("foo"),
}
@ -395,8 +395,8 @@ func TestStoreRestore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(Revision{Main: 3})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(Revision{Main: 3})}}
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil}
@ -412,17 +412,17 @@ func TestStoreRestore(t *testing.T) {
wact := []testutil.Action{
{Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []any{schema.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
{Name: "range", Params: []any{schema.Key, newTestRevBytes(Revision{Main: 1}), newTestRevBytes(Revision{Main: math.MaxInt64, Sub: math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
}
gens := []generation{
{created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
{created: revision{0, 0}, ver: 0, revs: nil},
{created: Revision{Main: 4}, ver: 2, revs: []Revision{Revision{Main: 3}, Revision{Main: 5}}},
{created: Revision{Main: 0}, ver: 0, revs: nil},
}
ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
ki := &keyIndex{key: []byte("foo"), modified: Revision{Main: 5}, generations: gens}
wact = []testutil.Action{
{Name: "keyIndex", Params: []any{ki}},
{Name: "insert", Params: []any{ki}},
@ -496,8 +496,6 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
UnsafeSetScheduledCompact(tx, 2)
@ -524,8 +522,8 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
revbytes := NewRevBytes()
revbytes = BucketKeyToBytes(newBucketKey(1, 0, false), revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
@ -540,7 +538,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
t.Errorf("key for rev %+v still exists, want deleted", BytesToBucketKey(revbytes))
})
}
}
@ -605,7 +603,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
}()
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
// Compact the store in a goroutine, using RevisionTombstone 9900 to 10000 and close donec when finished
go func() {
defer close(donec)
for i := 100; i >= 0; i-- {
@ -629,7 +627,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
// with a past RevisionTombstone (lower than compacted), a future RevisionTombstone, and the exact compacted RevisionTombstone
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
@ -662,7 +660,7 @@ func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
}
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
// correct hash value with latest revision.
// correct hash value with latest RevisionTombstone.
func TestHashKVZeroRevision(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
@ -882,19 +880,14 @@ func merge(dst, src kvs) kvs {
// TODO: test attach key to lessor
func newTestRevBytes(rev revision) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
return bytes
func newTestRevBytes(rev Revision) []byte {
bytes := NewRevBytes()
return RevToBytes(rev, bytes)
}
func newTestKeyBytes(lg *zap.Logger, rev revision, tombstone bool) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
if tombstone {
bytes = appendMarkTombstone(lg, bytes)
}
return bytes
func newTestBucketKeyBytes(rev BucketKey) []byte {
bytes := NewRevBytes()
return BucketKeyToBytes(rev, bytes)
}
func newFakeStore(lg *zap.Logger) *store {
@ -926,7 +919,7 @@ func newFakeIndex() *fakeIndex {
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1),
indexCompactRespc: make(chan map[Revision]struct{}, 1),
}
}
@ -986,19 +979,19 @@ func (b *fakeBackend) Close() error
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
type indexGetResp struct {
rev revision
created revision
rev Revision
created Revision
ver int64
err error
}
type indexRangeResp struct {
keys [][]byte
revs []revision
revs []Revision
}
type indexRangeEventsResp struct {
revs []revision
revs []Revision
}
type fakeIndex struct {
@ -1006,10 +999,10 @@ type fakeIndex struct {
indexGetRespc chan indexGetResp
indexRangeRespc chan indexRangeResp
indexRangeEventsRespc chan indexRangeEventsResp
indexCompactRespc chan map[revision]struct{}
indexCompactRespc chan map[Revision]struct{}
}
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) {
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]Revision, int) {
_, rev := i.Range(key, end, atRev)
if len(rev) >= limit {
rev = rev[:limit]
@ -1022,33 +1015,33 @@ func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
return len(rev)
}
func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created Revision, ver int64, err error) {
i.Recorder.Record(testutil.Action{Name: "get", Params: []any{key, atRev}})
r := <-i.indexGetRespc
return r.rev, r.created, r.ver, r.err
}
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []Revision) {
i.Recorder.Record(testutil.Action{Name: "range", Params: []any{key, end, atRev}})
r := <-i.indexRangeRespc
return r.keys, r.revs
}
func (i *fakeIndex) Put(key []byte, rev revision) {
func (i *fakeIndex) Put(key []byte, rev Revision) {
i.Recorder.Record(testutil.Action{Name: "put", Params: []any{key, rev}})
}
func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
func (i *fakeIndex) Tombstone(key []byte, rev Revision) error {
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []any{key, rev}})
return nil
}
func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []Revision {
i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []any{key, end, rev}})
r := <-i.indexRangeEventsRespc
return r.revs
}
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
func (i *fakeIndex) Compact(rev int64) map[Revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []any{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
func (i *fakeIndex) Keep(rev int64) map[Revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "keep", Params: []any{rev}})
return <-i.indexCompactRespc
}

View File

@ -97,20 +97,20 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev
}
kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
revBytes := NewRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
default:
}
revToBytes(revpair, revBytes)
revBytes = RevToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
if len(vs) != 1 {
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
zap.Int64("revision-main", revpair.Main),
zap.Int64("revision-sub", revpair.Sub),
zap.Int64("revision-current", curRev),
zap.Int64("range-option-rev", ro.Rev),
zap.Int64("range-option-limit", ro.Limit),
@ -202,13 +202,13 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
c = created.Main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
tw.trace.Step("get key's previous created_revision and leaseID")
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes := NewRevBytes()
idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
ibytes = RevToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
@ -279,11 +279,9 @@ func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
}
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnCommon.s.lg, ibytes)
ibytes := NewRevBytes()
idxRev := newBucketKey(tw.beginRev+1, int64(len(tw.changes)), true)
ibytes = BucketKeyToBytes(idxRev, ibytes)
kv := mvccpb.KeyValue{Key: key}
@ -296,7 +294,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
}
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
err = tw.s.kvindex.Tombstone(key, idxRev.Revision)
if err != nil {
tw.storeTxnCommon.s.lg.Fatal(
"failed to tombstone an existing key",

View File

@ -14,48 +14,97 @@
package mvcc
import "encoding/binary"
import (
"encoding/binary"
)
// revBytesLen is the byte length of a normal revision.
// First 8 bytes is the revision.main in big-endian format. The 9th byte
// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
const revBytesLen = 8 + 1 + 8
const (
// revBytesLen is the byte length of a normal revision.
// First 8 bytes is the revision.main in big-endian format. The 9th byte
// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
revBytesLen = 8 + 1 + 8
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
)
// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the sub revision of a change in a set of changes that happen
type Revision struct {
// Main is the main revision of a set of changes that happen atomically.
Main int64
// Sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
Sub int64
}
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
func (a Revision) GreaterThan(b Revision) bool {
if a.Main > b.Main {
return true
}
if a.main < b.main {
if a.Main < b.Main {
return false
}
return a.sub > b.sub
return a.Sub > b.Sub
}
func newRevBytes() []byte {
func RevToBytes(rev Revision, bytes []byte) []byte {
return BucketKeyToBytes(newBucketKey(rev.Main, rev.Sub, false), bytes)
}
func BytesToRev(bytes []byte) Revision {
return BytesToBucketKey(bytes).Revision
}
// BucketKey indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type BucketKey struct {
Revision
tombstone bool
}
func newBucketKey(main, sub int64, isTombstone bool) BucketKey {
return BucketKey{
Revision: Revision{
Main: main,
Sub: sub,
},
tombstone: isTombstone,
}
}
func NewRevBytes() []byte {
return make([]byte, revBytesLen, markedRevBytesLen)
}
func revToBytes(rev revision, bytes []byte) {
binary.BigEndian.PutUint64(bytes, uint64(rev.main))
func BucketKeyToBytes(rev BucketKey, bytes []byte) []byte {
binary.BigEndian.PutUint64(bytes, uint64(rev.Main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.Sub))
if rev.tombstone {
switch len(bytes) {
case revBytesLen:
bytes = append(bytes, markTombstone)
case markedRevBytesLen:
bytes[markBytePosition] = markTombstone
}
}
return bytes
}
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
func BytesToBucketKey(bytes []byte) BucketKey {
return BucketKey{
Revision: Revision{
Main: int64(binary.BigEndian.Uint64(bytes[0:8])),
Sub: int64(binary.BigEndian.Uint64(bytes[9:])),
},
tombstone: isTombstone(bytes),
}
}
// isTombstone checks whether the revision bytes is a tombstone.
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}

View File

@ -1,53 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"math"
"reflect"
"testing"
)
// TestRevision tests that revision could be encoded to and decoded from
// bytes slice. Moreover, the lexicographical order of its byte slice representation
// follows the order of (main, sub).
func TestRevision(t *testing.T) {
tests := []revision{
// order in (main, sub)
{},
{main: 1, sub: 0},
{main: 1, sub: 1},
{main: 2, sub: 0},
{main: math.MaxInt64, sub: math.MaxInt64},
}
bs := make([][]byte, len(tests))
for i, tt := range tests {
b := newRevBytes()
revToBytes(tt, b)
bs[i] = b
if grev := bytesToRev(b); !reflect.DeepEqual(grev, tt) {
t.Errorf("#%d: revision = %+v, want %+v", i, grev, tt)
}
}
for i := 0; i < len(tests)-1; i++ {
if bytes.Compare(bs[i], bs[i+1]) >= 0 {
t.Errorf("#%d: %v (%+v) should be smaller than %v (%+v)", i, bs[i], tests[i], bs[i+1], tests[i+1])
}
}
}

View File

@ -22,7 +22,7 @@ import (
func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
return BytesToRev(finishedCompactBytes[0]).Main, true
}
return 0, false
}
@ -30,7 +30,7 @@ func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, f
func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true
return BytesToRev(scheduledCompactBytes[0]).Main, true
}
return 0, false
}
@ -42,8 +42,8 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) {
}
func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
rbytes := NewRevBytes()
rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}
@ -54,7 +54,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) {
}
func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
rbytes := NewRevBytes()
rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
}

View File

@ -352,9 +352,9 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
@ -428,7 +428,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
// patch in mod revision so watchers won't skip
kv.ModRevision = bytesToRev(revs[i]).main
kv.ModRevision = BytesToRev(revs[i]).Main
}
evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}