Refactor common revision code to pkg

Signed-off-by: Allen Ray <alray@redhat.com>
This commit is contained in:
Allen Ray 2023-07-18 12:06:43 -04:00
parent 1c5289dd73
commit 395376d3ab
17 changed files with 470 additions and 554 deletions

View File

@ -1,53 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package snapshot
import (
"encoding/binary"
)
type revision struct {
main int64
sub int64
}
// GreaterThan should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}
// bytesToRev should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
// revToBytes should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

View File

@ -156,8 +156,8 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("cannot write to bucket %s", herr.Error()) return fmt.Errorf("cannot write to bucket %s", herr.Error())
} }
if iskeyb { if iskeyb {
rev := bytesToRev(k) rev := mvcc.BytesToRev(k)
ds.Revision = rev.main ds.Revision = rev.Main
} }
ds.TotalKey++ ds.TotalKey++
return nil return nil
@ -346,42 +346,42 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
return err return err
} }
latest = s.unsafeBumpRevision(tx, latest, int64(bumpAmount)) latest = s.unsafeBumpBucketsRevision(tx, latest, int64(bumpAmount))
s.unsafeMarkRevisionCompacted(tx, latest) s.unsafeMarkRevisionCompacted(tx, latest)
return nil return nil
} }
func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision { func (s *v3Manager) unsafeBumpBucketsRevision(tx backend.UnsafeWriter, latest mvcc.Revision, amount int64) mvcc.Revision {
s.lg.Info( s.lg.Info(
"bumping latest revision", "bumping latest revision",
zap.Int64("latest-revision", latest.main), zap.Int64("latest-revision", latest.Main),
zap.Int64("bump-amount", amount), zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount), zap.Int64("new-latest-revision", latest.Main+amount),
) )
latest.main += amount latest.Main += amount
latest.sub = 0 latest.Sub = 0
k := make([]byte, 17) k := mvcc.NewRevBytes()
revToBytes(k, latest) k = mvcc.RevToBytes(latest, k)
tx.UnsafePut(schema.Key, k, []byte{}) tx.UnsafePut(schema.Key, k, []byte{})
return latest return latest
} }
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) { func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest mvcc.Revision) {
s.lg.Info( s.lg.Info(
"marking revision compacted", "marking revision compacted",
zap.Int64("revision", latest.main), zap.Int64("revision", latest.Main),
) )
mvcc.UnsafeSetScheduledCompact(tx, latest.main) mvcc.UnsafeSetScheduledCompact(tx, latest.Main)
} }
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) { func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (mvcc.Revision, error) {
var latest revision var latest mvcc.Revision
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) { err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k) rev := mvcc.BytesToRev(k)
if rev.GreaterThan(latest) { if rev.GreaterThan(latest) {
latest = rev latest = rev

View File

@ -30,7 +30,7 @@ const (
hashStorageMaxSize = 10 hashStorageMaxSize = 10
) )
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[Revision]struct{}) (KeyValueHash, error) {
h := newKVHasher(compactRevision, revision, keep) h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error { err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v) h.WriteKeyValue(k, v)
@ -43,10 +43,10 @@ type kvHasher struct {
hash hash.Hash32 hash hash.Hash32
compactRevision int64 compactRevision int64
revision int64 revision int64
keep map[revision]struct{} keep map[Revision]struct{}
} }
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher { func newKVHasher(compactRev, rev int64, keep map[Revision]struct{}) kvHasher {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(schema.Key.Name()) h.Write(schema.Key.Name())
return kvHasher{ return kvHasher{
@ -58,12 +58,12 @@ func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
} }
func (h *kvHasher) WriteKeyValue(k, v []byte) { func (h *kvHasher) WriteKeyValue(k, v []byte) {
kr := bytesToRev(k) kr := BytesToRev(k)
upper := revision{main: h.revision + 1} upper := Revision{Main: h.revision + 1}
if !upper.GreaterThan(kr) { if !upper.GreaterThan(kr) {
return return
} }
lower := revision{main: h.compactRevision + 1} lower := Revision{Main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion // skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one. // due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(h.keep) > 0 { if lower.GreaterThan(kr) && len(h.keep) > 0 {

View File

@ -22,14 +22,14 @@ import (
) )
type index interface { type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) Get(key []byte, atRev int64) (rev, created Revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision) Range(key, end []byte, atRev int64) ([][]byte, []Revision)
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) Revisions(key, end []byte, atRev int64, limit int) ([]Revision, int)
CountRevisions(key, end []byte, atRev int64) int CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision) Put(key []byte, rev Revision)
Tombstone(key []byte, rev revision) error Tombstone(key []byte, rev Revision) error
Compact(rev int64) map[revision]struct{} Compact(rev int64) map[Revision]struct{}
Keep(rev int64) map[revision]struct{} Keep(rev int64) map[Revision]struct{}
Equal(b index) bool Equal(b index) bool
Insert(ki *keyIndex) Insert(ki *keyIndex)
@ -51,30 +51,30 @@ func newTreeIndex(lg *zap.Logger) index {
} }
} }
func (ti *treeIndex) Put(key []byte, rev revision) { func (ti *treeIndex) Put(key []byte, rev Revision) {
keyi := &keyIndex{key: key} keyi := &keyIndex{key: key}
ti.Lock() ti.Lock()
defer ti.Unlock() defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi) okeyi, ok := ti.tree.Get(keyi)
if !ok { if !ok {
keyi.put(ti.lg, rev.main, rev.sub) keyi.put(ti.lg, rev.Main, rev.Sub)
ti.tree.ReplaceOrInsert(keyi) ti.tree.ReplaceOrInsert(keyi)
return return
} }
okeyi.put(ti.lg, rev.main, rev.sub) okeyi.put(ti.lg, rev.Main, rev.Sub)
} }
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
ti.RLock() ti.RLock()
defer ti.RUnlock() defer ti.RUnlock()
return ti.unsafeGet(key, atRev) return ti.unsafeGet(key, atRev)
} }
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) { func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
keyi := &keyIndex{key: key} keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil { if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound return Revision{}, Revision{}, 0, ErrRevisionNotFound
} }
return keyi.get(ti.lg, atRev) return keyi.get(ti.lg, atRev)
} }
@ -109,7 +109,7 @@ func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
// Revisions returns limited number of revisions from key(included) to end(excluded) // Revisions returns limited number of revisions from key(included) to end(excluded)
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0. // at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
// The second return parameter isn't capped by the limit and reflects the total number of revisions. // The second return parameter isn't capped by the limit and reflects the total number of revisions.
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) { func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {
ti.RLock() ti.RLock()
defer ti.RUnlock() defer ti.RUnlock()
@ -118,7 +118,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
if err != nil { if err != nil {
return nil, 0 return nil, 0
} }
return []revision{rev}, 1 return []Revision{rev}, 1
} }
ti.unsafeVisit(key, end, func(ki *keyIndex) bool { ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
@ -155,7 +155,7 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
return total return total
} }
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) { func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []Revision) {
ti.RLock() ti.RLock()
defer ti.RUnlock() defer ti.RUnlock()
@ -164,7 +164,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
if err != nil { if err != nil {
return nil, nil return nil, nil
} }
return [][]byte{key}, []revision{rev} return [][]byte{key}, []Revision{rev}
} }
ti.unsafeVisit(key, end, func(ki *keyIndex) bool { ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
@ -176,7 +176,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return keys, revs return keys, revs
} }
func (ti *treeIndex) Tombstone(key []byte, rev revision) error { func (ti *treeIndex) Tombstone(key []byte, rev Revision) error {
keyi := &keyIndex{key: key} keyi := &keyIndex{key: key}
ti.Lock() ti.Lock()
@ -186,11 +186,11 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
return ErrRevisionNotFound return ErrRevisionNotFound
} }
return ki.tombstone(ti.lg, rev.main, rev.sub) return ki.tombstone(ti.lg, rev.Main, rev.Sub)
} }
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
available := make(map[revision]struct{}) available := make(map[Revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev)) ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock() ti.Lock()
clone := ti.tree.Clone() clone := ti.tree.Clone()
@ -214,8 +214,8 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
} }
// Keep finds all revisions to be kept for a Compaction at the given rev. // Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { func (ti *treeIndex) Keep(rev int64) map[Revision]struct{} {
available := make(map[revision]struct{}) available := make(map[Revision]struct{})
ti.RLock() ti.RLock()
defer ti.RUnlock() defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool { ti.tree.Ascend(func(keyi *keyIndex) bool {

View File

@ -33,7 +33,7 @@ func benchmarkIndexCompact(b *testing.B, size int) {
bytesN := 64 bytesN := 64
keys := createBytesSlice(bytesN, size) keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ { for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
} }
b.ResetTimer() b.ResetTimer()
for i := 1; i < b.N; i++ { for i := 1; i < b.N; i++ {
@ -49,7 +49,7 @@ func BenchmarkIndexPut(b *testing.B) {
keys := createBytesSlice(bytesN, b.N) keys := createBytesSlice(bytesN, b.N)
b.ResetTimer() b.ResetTimer()
for i := 1; i < b.N; i++ { for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
} }
} }
@ -60,7 +60,7 @@ func BenchmarkIndexGet(b *testing.B) {
bytesN := 64 bytesN := 64
keys := createBytesSlice(bytesN, b.N) keys := createBytesSlice(bytesN, b.N)
for i := 1; i < b.N; i++ { for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) kvindex.Put(keys[i], Revision{Main: int64(i), Sub: int64(i)})
} }
b.ResetTimer() b.ResetTimer()
for i := 1; i < b.N; i++ { for i := 1; i < b.N; i++ {

View File

@ -24,25 +24,25 @@ import (
func TestIndexGet(t *testing.T) { func TestIndexGet(t *testing.T) {
ti := newTreeIndex(zaptest.NewLogger(t)) ti := newTreeIndex(zaptest.NewLogger(t))
ti.Put([]byte("foo"), revision{main: 2}) ti.Put([]byte("foo"), Revision{Main: 2})
ti.Put([]byte("foo"), revision{main: 4}) ti.Put([]byte("foo"), Revision{Main: 4})
ti.Tombstone([]byte("foo"), revision{main: 6}) ti.Tombstone([]byte("foo"), Revision{Main: 6})
tests := []struct { tests := []struct {
rev int64 rev int64
wrev revision wrev Revision
wcreated revision wcreated Revision
wver int64 wver int64
werr error werr error
}{ }{
{0, revision{}, revision{}, 0, ErrRevisionNotFound}, {0, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{1, revision{}, revision{}, 0, ErrRevisionNotFound}, {1, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{2, revision{main: 2}, revision{main: 2}, 1, nil}, {2, Revision{Main: 2}, Revision{Main: 2}, 1, nil},
{3, revision{main: 2}, revision{main: 2}, 1, nil}, {3, Revision{Main: 2}, Revision{Main: 2}, 1, nil},
{4, revision{main: 4}, revision{main: 2}, 2, nil}, {4, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{5, revision{main: 4}, revision{main: 2}, 2, nil}, {5, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{6, revision{}, revision{}, 0, ErrRevisionNotFound}, {6, Revision{}, Revision{}, 0, ErrRevisionNotFound},
} }
for i, tt := range tests { for i, tt := range tests {
rev, created, ver, err := ti.Get([]byte("foo"), tt.rev) rev, created, ver, err := ti.Get([]byte("foo"), tt.rev)
@ -63,7 +63,7 @@ func TestIndexGet(t *testing.T) {
func TestIndexRange(t *testing.T) { func TestIndexRange(t *testing.T) {
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
allRevs := []revision{{main: 1}, {main: 2}, {main: 3}} allRevs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}
ti := newTreeIndex(zaptest.NewLogger(t)) ti := newTreeIndex(zaptest.NewLogger(t))
for i := range allKeys { for i := range allKeys {
@ -74,7 +74,7 @@ func TestIndexRange(t *testing.T) {
tests := []struct { tests := []struct {
key, end []byte key, end []byte
wkeys [][]byte wkeys [][]byte
wrevs []revision wrevs []Revision
}{ }{
// single key that not found // single key that not found
{ {
@ -122,9 +122,9 @@ func TestIndexRange(t *testing.T) {
func TestIndexTombstone(t *testing.T) { func TestIndexTombstone(t *testing.T) {
ti := newTreeIndex(zaptest.NewLogger(t)) ti := newTreeIndex(zaptest.NewLogger(t))
ti.Put([]byte("foo"), revision{main: 1}) ti.Put([]byte("foo"), Revision{Main: 1})
err := ti.Tombstone([]byte("foo"), revision{main: 2}) err := ti.Tombstone([]byte("foo"), Revision{Main: 2})
if err != nil { if err != nil {
t.Errorf("tombstone error = %v, want nil", err) t.Errorf("tombstone error = %v, want nil", err)
} }
@ -133,7 +133,7 @@ func TestIndexTombstone(t *testing.T) {
if err != ErrRevisionNotFound { if err != ErrRevisionNotFound {
t.Errorf("get error = %v, want ErrRevisionNotFound", err) t.Errorf("get error = %v, want ErrRevisionNotFound", err)
} }
err = ti.Tombstone([]byte("foo"), revision{main: 3}) err = ti.Tombstone([]byte("foo"), Revision{Main: 3})
if err != ErrRevisionNotFound { if err != ErrRevisionNotFound {
t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound) t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound)
} }
@ -141,7 +141,7 @@ func TestIndexTombstone(t *testing.T) {
func TestIndexRevision(t *testing.T) { func TestIndexRevision(t *testing.T) {
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")}
allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} allRevs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 5}, Revision{Main: 6}}
ti := newTreeIndex(zaptest.NewLogger(t)) ti := newTreeIndex(zaptest.NewLogger(t))
for i := range allKeys { for i := range allKeys {
@ -152,7 +152,7 @@ func TestIndexRevision(t *testing.T) {
key, end []byte key, end []byte
atRev int64 atRev int64
limit int limit int
wrevs []revision wrevs []Revision
wcounts int wcounts int
}{ }{
// single key that not found // single key that not found
@ -161,23 +161,23 @@ func TestIndexRevision(t *testing.T) {
}, },
// single key that found // single key that found
{ {
[]byte("foo"), nil, 6, 0, []revision{{main: 6}}, 1, []byte("foo"), nil, 6, 0, []Revision{Revision{Main: 6}}, 1,
}, },
// various range keys, fixed atRev, unlimited // various range keys, fixed atRev, unlimited
{ {
[]byte("foo"), []byte("foo1"), 6, 0, []revision{{main: 6}}, 1, []byte("foo"), []byte("foo1"), 6, 0, []Revision{Revision{Main: 6}}, 1,
}, },
{ {
[]byte("foo"), []byte("foo2"), 6, 0, []revision{{main: 6}, {main: 5}}, 2, []byte("foo"), []byte("foo2"), 6, 0, []Revision{Revision{Main: 6}, Revision{Main: 5}}, 2,
}, },
{ {
[]byte("foo"), []byte("fop"), 6, 0, []revision{{main: 6}, {main: 5}, {main: 4}}, 3, []byte("foo"), []byte("fop"), 6, 0, []Revision{Revision{Main: 6}, Revision{Main: 5}, Revision{Main: 4}}, 3,
}, },
{ {
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2, []byte("foo1"), []byte("fop"), 6, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
}, },
{ {
[]byte("foo2"), []byte("fop"), 6, 0, []revision{{main: 4}}, 1, []byte("foo2"), []byte("fop"), 6, 0, []Revision{Revision{Main: 4}}, 1,
}, },
{ {
[]byte("foo3"), []byte("fop"), 6, 0, nil, 0, []byte("foo3"), []byte("fop"), 6, 0, nil, 0,
@ -187,38 +187,38 @@ func TestIndexRevision(t *testing.T) {
[]byte("foo1"), []byte("fop"), 1, 0, nil, 0, []byte("foo1"), []byte("fop"), 1, 0, nil, 0,
}, },
{ {
[]byte("foo1"), []byte("fop"), 2, 0, []revision{{main: 2}}, 1, []byte("foo1"), []byte("fop"), 2, 0, []Revision{Revision{Main: 2}}, 1,
}, },
{ {
[]byte("foo1"), []byte("fop"), 3, 0, []revision{{main: 2}, {main: 3}}, 2, []byte("foo1"), []byte("fop"), 3, 0, []Revision{Revision{Main: 2}, Revision{Main: 3}}, 2,
}, },
{ {
[]byte("foo1"), []byte("fop"), 4, 0, []revision{{main: 2}, {main: 4}}, 2, []byte("foo1"), []byte("fop"), 4, 0, []Revision{Revision{Main: 2}, Revision{Main: 4}}, 2,
}, },
{ {
[]byte("foo1"), []byte("fop"), 5, 0, []revision{{main: 5}, {main: 4}}, 2, []byte("foo1"), []byte("fop"), 5, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
}, },
{ {
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2, []byte("foo1"), []byte("fop"), 6, 0, []Revision{Revision{Main: 5}, Revision{Main: 4}}, 2,
}, },
// fixed range keys, fixed atRev, various limit // fixed range keys, fixed atRev, various limit
{ {
[]byte("foo"), []byte("fop"), 6, 1, []revision{{main: 6}}, 3, []byte("foo"), []byte("fop"), 6, 1, []Revision{Revision{Main: 6}}, 3,
}, },
{ {
[]byte("foo"), []byte("fop"), 6, 2, []revision{{main: 6}, {main: 5}}, 3, []byte("foo"), []byte("fop"), 6, 2, []Revision{Revision{Main: 6}, Revision{Main: 5}}, 3,
}, },
{ {
[]byte("foo"), []byte("fop"), 6, 3, []revision{{main: 6}, {main: 5}, {main: 4}}, 3, []byte("foo"), []byte("fop"), 6, 3, []Revision{Revision{Main: 6}, Revision{Main: 5}, Revision{Main: 4}}, 3,
}, },
{ {
[]byte("foo"), []byte("fop"), 3, 1, []revision{{main: 1}}, 3, []byte("foo"), []byte("fop"), 3, 1, []Revision{Revision{Main: 1}}, 3,
}, },
{ {
[]byte("foo"), []byte("fop"), 3, 2, []revision{{main: 1}, {main: 2}}, 3, []byte("foo"), []byte("fop"), 3, 2, []Revision{Revision{Main: 1}, Revision{Main: 2}}, 3,
}, },
{ {
[]byte("foo"), []byte("fop"), 3, 3, []revision{{main: 1}, {main: 2}, {main: 3}}, 3, []byte("foo"), []byte("fop"), 3, 3, []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}, 3,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -238,21 +238,21 @@ func TestIndexCompactAndKeep(t *testing.T) {
tests := []struct { tests := []struct {
key []byte key []byte
remove bool remove bool
rev revision rev Revision
created revision created Revision
ver int64 ver int64
}{ }{
{[]byte("foo"), false, revision{main: 1}, revision{main: 1}, 1}, {[]byte("foo"), false, Revision{Main: 1}, Revision{Main: 1}, 1},
{[]byte("foo1"), false, revision{main: 2}, revision{main: 2}, 1}, {[]byte("foo1"), false, Revision{Main: 2}, Revision{Main: 2}, 1},
{[]byte("foo2"), false, revision{main: 3}, revision{main: 3}, 1}, {[]byte("foo2"), false, Revision{Main: 3}, Revision{Main: 3}, 1},
{[]byte("foo2"), false, revision{main: 4}, revision{main: 3}, 2}, {[]byte("foo2"), false, Revision{Main: 4}, Revision{Main: 3}, 2},
{[]byte("foo"), false, revision{main: 5}, revision{main: 1}, 2}, {[]byte("foo"), false, Revision{Main: 5}, Revision{Main: 1}, 2},
{[]byte("foo1"), false, revision{main: 6}, revision{main: 2}, 2}, {[]byte("foo1"), false, Revision{Main: 6}, Revision{Main: 2}, 2},
{[]byte("foo1"), true, revision{main: 7}, revision{}, 0}, {[]byte("foo1"), true, Revision{Main: 7}, Revision{}, 0},
{[]byte("foo2"), true, revision{main: 8}, revision{}, 0}, {[]byte("foo2"), true, Revision{Main: 8}, Revision{}, 0},
{[]byte("foo"), true, revision{main: 9}, revision{}, 0}, {[]byte("foo"), true, Revision{Main: 9}, Revision{}, 0},
{[]byte("foo"), false, revision{10, 0}, revision{10, 0}, 1}, {[]byte("foo"), false, Revision{Main: 10}, Revision{Main: 10}, 1},
{[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1}, {[]byte("foo1"), false, Revision{Main: 10, Sub: 1}, Revision{Main: 10, Sub: 1}, 1},
} }
// Continuous Compact and Keep // Continuous Compact and Keep
@ -274,7 +274,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
return aki.Less(bki) return aki.Less(bki)
})} })}
for _, tt := range tests { for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove { if tt.remove {
wti.Tombstone(tt.key, tt.rev) wti.Tombstone(tt.key, tt.rev)
} else { } else {
@ -306,7 +306,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
return aki.Less(bki) return aki.Less(bki)
})} })}
for _, tt := range tests { for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) {
if tt.remove { if tt.remove {
wti.Tombstone(tt.key, tt.rev) wti.Tombstone(tt.key, tt.rev)
} else { } else {
@ -320,7 +320,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
} }
} }
func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { func restore(ti *treeIndex, key []byte, created, modified Revision, ver int64) {
keyi := &keyIndex{key: key} keyi := &keyIndex{key: key}
ti.Lock() ti.Lock()
@ -331,5 +331,5 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
ti.tree.ReplaceOrInsert(keyi) ti.tree.ReplaceOrInsert(keyi)
return return
} }
okeyi.put(ti.lg, modified.main, modified.sub) okeyi.put(ti.lg, modified.Main, modified.Sub)
} }

View File

@ -73,21 +73,21 @@ var (
// {empty} -> key SHOULD be removed. // {empty} -> key SHOULD be removed.
type keyIndex struct { type keyIndex struct {
key []byte key []byte
modified revision // the main rev of the last modification modified Revision // the main rev of the last modification
generations []generation generations []generation
} }
// put puts a revision to the keyIndex. // put puts a revision to the keyIndex.
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub} rev := Revision{Main: main, Sub: sub}
if !rev.GreaterThan(ki.modified) { if !rev.GreaterThan(ki.modified) {
lg.Panic( lg.Panic(
"'put' with an unexpected smaller revision", "'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main), zap.Int64("given-revision-main", rev.Main),
zap.Int64("given-revision-sub", rev.sub), zap.Int64("given-revision-sub", rev.Sub),
zap.Int64("modified-revision-main", ki.modified.main), zap.Int64("modified-revision-main", ki.modified.Main),
zap.Int64("modified-revision-sub", ki.modified.sub), zap.Int64("modified-revision-sub", ki.modified.Sub),
) )
} }
if len(ki.generations) == 0 { if len(ki.generations) == 0 {
@ -103,7 +103,7 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
ki.modified = rev ki.modified = rev
} }
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) { func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int64) {
if len(ki.generations) != 0 { if len(ki.generations) != 0 {
lg.Panic( lg.Panic(
"'restore' got an unexpected non-empty generations", "'restore' got an unexpected non-empty generations",
@ -112,7 +112,7 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6
} }
ki.modified = modified ki.modified = modified
g := generation{created: created, ver: ver, revs: []revision{modified}} g := generation{created: created, ver: ver, revs: []Revision{modified}}
ki.generations = append(ki.generations, g) ki.generations = append(ki.generations, g)
keysGauge.Inc() keysGauge.Inc()
} }
@ -138,7 +138,7 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
// get gets the modified, created revision and version of the key that satisfies the given atRev. // get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be smaller than or equal to the given atRev. // Rev must be smaller than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) {
if ki.isEmpty() { if ki.isEmpty() {
lg.Panic( lg.Panic(
"'get' got an unexpected empty keyIndex", "'get' got an unexpected empty keyIndex",
@ -147,28 +147,28 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision
} }
g := ki.findGeneration(atRev) g := ki.findGeneration(atRev)
if g.isEmpty() { if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound return Revision{}, Revision{}, 0, ErrRevisionNotFound
} }
n := g.walk(func(rev revision) bool { return rev.main > atRev }) n := g.walk(func(rev Revision) bool { return rev.Main > atRev })
if n != -1 { if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
} }
return revision{}, revision{}, 0, ErrRevisionNotFound return Revision{}, Revision{}, 0, ErrRevisionNotFound
} }
// since returns revisions since the given rev. Only the revision with the // since returns revisions since the given rev. Only the revision with the
// largest sub revision will be returned if multiple revisions have the same // largest sub revision will be returned if multiple revisions have the same
// main revision. // main revision.
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { func (ki *keyIndex) since(lg *zap.Logger, rev int64) []Revision {
if ki.isEmpty() { if ki.isEmpty() {
lg.Panic( lg.Panic(
"'since' got an unexpected empty keyIndex", "'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)), zap.String("key", string(ki.key)),
) )
} }
since := revision{rev, 0} since := Revision{Main: rev}
var gi int var gi int
// find the generations to start checking // find the generations to start checking
for gi = len(ki.generations) - 1; gi > 0; gi-- { for gi = len(ki.generations) - 1; gi > 0; gi-- {
@ -181,21 +181,21 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
} }
} }
var revs []revision var revs []Revision
var last int64 var last int64
for ; gi < len(ki.generations); gi++ { for ; gi < len(ki.generations); gi++ {
for _, r := range ki.generations[gi].revs { for _, r := range ki.generations[gi].revs {
if since.GreaterThan(r) { if since.GreaterThan(r) {
continue continue
} }
if r.main == last { if r.Main == last {
// replace the revision with a new one that has higher sub value, // replace the revision with a new one that has higher sub value,
// because the original one should not be seen by external // because the original one should not be seen by external
revs[len(revs)-1] = r revs[len(revs)-1] = r
continue continue
} }
revs = append(revs, r) revs = append(revs, r)
last = r.main last = r.Main
} }
} }
return revs return revs
@ -205,7 +205,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
// revision than the given atRev except the largest one (If the largest one is // revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept). // a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed. // If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]struct{}) {
if ki.isEmpty() { if ki.isEmpty() {
lg.Panic( lg.Panic(
"'compact' got an unexpected empty keyIndex", "'compact' got an unexpected empty keyIndex",
@ -233,7 +233,7 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]
} }
// keep finds the revision to be kept if compact is called at given atRev. // keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { func (ki *keyIndex) keep(atRev int64, available map[Revision]struct{}) {
if ki.isEmpty() { if ki.isEmpty() {
return return
} }
@ -248,11 +248,11 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
} }
} }
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) { func (ki *keyIndex) doCompact(atRev int64, available map[Revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision smaller or equal to "atRev", // walk until reaching the first revision smaller or equal to "atRev",
// and add the revision to the available map // and add the revision to the available map
f := func(rev revision) bool { f := func(rev Revision) bool {
if rev.main <= atRev { if rev.Main <= atRev {
available[rev] = struct{}{} available[rev] = struct{}{}
return false return false
} }
@ -262,7 +262,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (gen
genIdx, g := 0, &ki.generations[0] genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev // find first generation includes atRev or created after atRev
for genIdx < len(ki.generations)-1 { for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev { if tomb := g.revs[len(g.revs)-1].Main; tomb > atRev {
break break
} }
genIdx++ genIdx++
@ -292,11 +292,11 @@ func (ki *keyIndex) findGeneration(rev int64) *generation {
} }
g := ki.generations[cg] g := ki.generations[cg]
if cg != lastg { if cg != lastg {
if tomb := g.revs[len(g.revs)-1].main; tomb <= rev { if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev {
return nil return nil
} }
} }
if g.revs[0].main <= rev { if g.revs[0].Main <= rev {
return &ki.generations[cg] return &ki.generations[cg]
} }
cg-- cg--
@ -338,8 +338,8 @@ func (ki *keyIndex) String() string {
// generation contains multiple revisions of a key. // generation contains multiple revisions of a key.
type generation struct { type generation struct {
ver int64 ver int64
created revision // when the generation is created (put in first revision). created Revision // when the generation is created (put in first revision).
revs []revision revs []Revision
} }
func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 } func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
@ -349,7 +349,7 @@ func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
// walk returns until: 1. it finishes walking all pairs 2. the function returns false. // walk returns until: 1. it finishes walking all pairs 2. the function returns false.
// walk returns the position at where it stopped. If it stopped after // walk returns the position at where it stopped. If it stopped after
// finishing walking, -1 will be returned. // finishing walking, -1 will be returned.
func (g *generation) walk(f func(rev revision) bool) int { func (g *generation) walk(f func(rev Revision) bool) int {
l := len(g.revs) l := len(g.revs)
for i := range g.revs { for i := range g.revs {
ok := f(g.revs[l-i-1]) ok := f(g.revs[l-i-1])

View File

@ -31,43 +31,43 @@ func TestKeyIndexGet(t *testing.T) {
// {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
ki := newTestKeyIndex(zaptest.NewLogger(t)) ki := newTestKeyIndex(zaptest.NewLogger(t))
ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{})) ki.compact(zaptest.NewLogger(t), 4, make(map[Revision]struct{}))
tests := []struct { tests := []struct {
rev int64 rev int64
wmod revision wmod Revision
wcreat revision wcreat Revision
wver int64 wver int64
werr error werr error
}{ }{
{17, revision{}, revision{}, 0, ErrRevisionNotFound}, {17, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{16, revision{}, revision{}, 0, ErrRevisionNotFound}, {16, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 3 // get on generation 3
{15, revision{14, 1}, revision{14, 0}, 2, nil}, {15, Revision{Main: 14, Sub: 1}, Revision{Main: 14}, 2, nil},
{14, revision{14, 1}, revision{14, 0}, 2, nil}, {14, Revision{Main: 14, Sub: 1}, Revision{Main: 14}, 2, nil},
{13, revision{}, revision{}, 0, ErrRevisionNotFound}, {13, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{12, revision{}, revision{}, 0, ErrRevisionNotFound}, {12, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 2 // get on generation 2
{11, revision{10, 0}, revision{8, 0}, 2, nil}, {11, Revision{Main: 10}, Revision{Main: 8}, 2, nil},
{10, revision{10, 0}, revision{8, 0}, 2, nil}, {10, Revision{Main: 10}, Revision{Main: 8}, 2, nil},
{9, revision{8, 0}, revision{8, 0}, 1, nil}, {9, Revision{Main: 8}, Revision{Main: 8}, 1, nil},
{8, revision{8, 0}, revision{8, 0}, 1, nil}, {8, Revision{Main: 8}, Revision{Main: 8}, 1, nil},
{7, revision{}, revision{}, 0, ErrRevisionNotFound}, {7, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{6, revision{}, revision{}, 0, ErrRevisionNotFound}, {6, Revision{}, Revision{}, 0, ErrRevisionNotFound},
// get on generation 1 // get on generation 1
{5, revision{4, 0}, revision{2, 0}, 2, nil}, {5, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{4, revision{4, 0}, revision{2, 0}, 2, nil}, {4, Revision{Main: 4}, Revision{Main: 2}, 2, nil},
{3, revision{}, revision{}, 0, ErrRevisionNotFound}, {3, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{2, revision{}, revision{}, 0, ErrRevisionNotFound}, {2, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{1, revision{}, revision{}, 0, ErrRevisionNotFound}, {1, Revision{}, Revision{}, 0, ErrRevisionNotFound},
{0, revision{}, revision{}, 0, ErrRevisionNotFound}, {0, Revision{}, Revision{}, 0, ErrRevisionNotFound},
} }
for i, tt := range tests { for i, tt := range tests {
@ -89,13 +89,21 @@ func TestKeyIndexGet(t *testing.T) {
func TestKeyIndexSince(t *testing.T) { func TestKeyIndexSince(t *testing.T) {
ki := newTestKeyIndex(zaptest.NewLogger(t)) ki := newTestKeyIndex(zaptest.NewLogger(t))
ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{})) ki.compact(zaptest.NewLogger(t), 4, make(map[Revision]struct{}))
allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}} allRevs := []Revision{
Revision{Main: 4},
Revision{Main: 6},
Revision{Main: 8},
Revision{Main: 10},
Revision{Main: 12},
Revision{Main: 14, Sub: 1},
Revision{Main: 16},
}
tests := []struct { tests := []struct {
rev int64 rev int64
wrevs []revision wrevs []Revision
}{ }{
{17, nil}, {17, nil},
{16, allRevs[6:]}, {16, allRevs[6:]},
@ -131,8 +139,8 @@ func TestKeyIndexPut(t *testing.T) {
wki := &keyIndex{ wki := &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{5, 0}, modified: Revision{Main: 5},
generations: []generation{{created: revision{5, 0}, ver: 1, revs: []revision{{main: 5}}}}, generations: []generation{{created: Revision{Main: 5}, ver: 1, revs: []Revision{Revision{Main: 5}}}},
} }
if !reflect.DeepEqual(ki, wki) { if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki) t.Errorf("ki = %+v, want %+v", ki, wki)
@ -142,8 +150,8 @@ func TestKeyIndexPut(t *testing.T) {
wki = &keyIndex{ wki = &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{7, 0}, modified: Revision{Main: 7},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}}, generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}}},
} }
if !reflect.DeepEqual(ki, wki) { if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki) t.Errorf("ki = %+v, want %+v", ki, wki)
@ -152,12 +160,12 @@ func TestKeyIndexPut(t *testing.T) {
func TestKeyIndexRestore(t *testing.T) { func TestKeyIndexRestore(t *testing.T) {
ki := &keyIndex{key: []byte("foo")} ki := &keyIndex{key: []byte("foo")}
ki.restore(zaptest.NewLogger(t), revision{5, 0}, revision{7, 0}, 2) ki.restore(zaptest.NewLogger(t), Revision{Main: 5}, Revision{Main: 7}, 2)
wki := &keyIndex{ wki := &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{7, 0}, modified: Revision{Main: 7},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 7}}}}, generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 7}}}},
} }
if !reflect.DeepEqual(ki, wki) { if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki) t.Errorf("ki = %+v, want %+v", ki, wki)
@ -175,8 +183,8 @@ func TestKeyIndexTombstone(t *testing.T) {
wki := &keyIndex{ wki := &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{7, 0}, modified: Revision{Main: 7},
generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}, {}}, generations: []generation{{created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}}, {}},
} }
if !reflect.DeepEqual(ki, wki) { if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki) t.Errorf("ki = %+v, want %+v", ki, wki)
@ -191,10 +199,10 @@ func TestKeyIndexTombstone(t *testing.T) {
wki = &keyIndex{ wki = &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{15, 0}, modified: Revision{Main: 15},
generations: []generation{ generations: []generation{
{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}, {created: Revision{Main: 5}, ver: 2, revs: []Revision{Revision{Main: 5}, Revision{Main: 7}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 9}, {main: 15}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 9}, Revision{Main: 15}}},
{}, {},
}, },
} }
@ -213,241 +221,241 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
compact int64 compact int64
wki *keyIndex wki *keyIndex
wam map[revision]struct{} wam map[Revision]struct{}
}{ }{
{ {
1, 1,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
{ {
2, 2,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 2}: {}, Revision{Main: 2}: {},
}, },
}, },
{ {
3, 3,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 2}: {}, Revision{Main: 2}: {},
}, },
}, },
{ {
4, 4,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 4}, Revision{Main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 4}: {}, Revision{Main: 4}: {},
}, },
}, },
{ {
5, 5,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 4}, Revision{Main: 6}}},
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 4}: {}, Revision{Main: 4}: {},
}, },
}, },
{ {
6, 6,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
{ {
7, 7,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
{ {
8, 8,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 8}: {}, Revision{Main: 8}: {},
}, },
}, },
{ {
9, 9,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 8}: {}, Revision{Main: 8}: {},
}, },
}, },
{ {
10, 10,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 10}: {}, Revision{Main: 10}: {},
}, },
}, },
{ {
11, 11,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 10}, Revision{Main: 12}}},
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 10}: {}, Revision{Main: 10}: {},
}, },
}, },
{ {
12, 12,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
{ {
13, 13,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
{ {
14, 14,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 14, sub: 1}: {}, Revision{Main: 14, Sub: 1}: {},
}, },
}, },
{ {
15, 15,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14, Sub: 1}, Revision{Main: 16}}},
{}, {},
}, },
}, },
map[revision]struct{}{ map[Revision]struct{}{
{main: 14, sub: 1}: {}, Revision{Main: 14, Sub: 1}: {},
}, },
}, },
{ {
16, 16,
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{16, 0}, modified: Revision{Main: 16},
generations: []generation{ generations: []generation{
{}, {},
}, },
}, },
map[revision]struct{}{}, map[Revision]struct{}{},
}, },
} }
// Continuous Compaction and finding Keep // Continuous Compaction and finding Keep
ki := newTestKeyIndex(zaptest.NewLogger(t)) ki := newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests { for i, tt := range tests {
am := make(map[revision]struct{}) am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki) kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am) ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) { if !reflect.DeepEqual(ki, kiclone) {
@ -456,7 +464,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) { if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
} }
am = make(map[revision]struct{}) am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am) ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) { if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -470,7 +478,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
ki = newTestKeyIndex(zaptest.NewLogger(t)) ki = newTestKeyIndex(zaptest.NewLogger(t))
for i, tt := range tests { for i, tt := range tests {
if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) {
am := make(map[revision]struct{}) am := make(map[Revision]struct{})
kiclone := cloneKeyIndex(ki) kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am) ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) { if !reflect.DeepEqual(ki, kiclone) {
@ -479,7 +487,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) { if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
} }
am = make(map[revision]struct{}) am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am) ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) { if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -494,7 +502,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
// Once Compaction and finding Keep // Once Compaction and finding Keep
for i, tt := range tests { for i, tt := range tests {
ki := newTestKeyIndex(zaptest.NewLogger(t)) ki := newTestKeyIndex(zaptest.NewLogger(t))
am := make(map[revision]struct{}) am := make(map[Revision]struct{})
ki.keep(tt.compact, am) ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiClone) { if !reflect.DeepEqual(ki, kiClone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
@ -502,7 +510,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
if !reflect.DeepEqual(am, tt.wam) { if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
} }
am = make(map[revision]struct{}) am = make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), tt.compact, am) ki.compact(zaptest.NewLogger(t), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) { if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@ -525,7 +533,7 @@ func cloneGeneration(g *generation) *generation {
if g.revs == nil { if g.revs == nil {
return &generation{g.ver, g.created, nil} return &generation{g.ver, g.created, nil}
} }
tmp := make([]revision, len(g.revs)) tmp := make([]Revision, len(g.revs))
copy(tmp, g.revs) copy(tmp, g.revs)
return &generation{g.ver, g.created, tmp} return &generation{g.ver, g.created, tmp}
} }
@ -536,18 +544,18 @@ func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
ki := &keyIndex{key: []byte("foo")} ki := &keyIndex{key: []byte("foo")}
ki.put(zaptest.NewLogger(t), 1, 0) ki.put(zaptest.NewLogger(t), 1, 0)
ki.put(zaptest.NewLogger(t), 2, 0) ki.put(zaptest.NewLogger(t), 2, 0)
am := make(map[revision]struct{}) am := make(map[Revision]struct{})
ki.compact(zaptest.NewLogger(t), 3, am) ki.compact(zaptest.NewLogger(t), 3, am)
wki := &keyIndex{ wki := &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{2, 0}, modified: Revision{Main: 2},
generations: []generation{ generations: []generation{
{created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}}, {created: Revision{Main: 1}, ver: 2, revs: []Revision{Revision{Main: 2}}},
}, },
} }
wam := map[revision]struct{}{ wam := map[Revision]struct{}{
{main: 2}: {}, Revision{Main: 2}: {},
} }
if !reflect.DeepEqual(ki, wki) { if !reflect.DeepEqual(ki, wki) {
t.Errorf("ki = %+v, want %+v", ki, wki) t.Errorf("ki = %+v, want %+v", ki, wki)
@ -572,9 +580,9 @@ func TestKeyIndexIsEmpty(t *testing.T) {
{ {
&keyIndex{ &keyIndex{
key: []byte("foo"), key: []byte("foo"),
modified: revision{2, 0}, modified: Revision{Main: 2},
generations: []generation{ generations: []generation{
{created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}}, {created: Revision{Main: 1}, ver: 2, revs: []Revision{Revision{Main: 2}}},
}, },
}, },
false, false,
@ -644,7 +652,7 @@ func TestGenerationIsEmpty(t *testing.T) {
}{ }{
{nil, true}, {nil, true},
{&generation{}, true}, {&generation{}, true},
{&generation{revs: []revision{{main: 1}}}, false}, {&generation{revs: []Revision{Revision{Main: 1}}}, false},
} }
for i, tt := range tests { for i, tt := range tests {
g := tt.g.isEmpty() g := tt.g.isEmpty()
@ -657,19 +665,19 @@ func TestGenerationIsEmpty(t *testing.T) {
func TestGenerationWalk(t *testing.T) { func TestGenerationWalk(t *testing.T) {
g := &generation{ g := &generation{
ver: 3, ver: 3,
created: revision{2, 0}, created: Revision{Main: 2},
revs: []revision{{main: 2}, {main: 4}, {main: 6}}, revs: []Revision{Revision{Main: 2}, Revision{Main: 4}, Revision{Main: 6}},
} }
tests := []struct { tests := []struct {
f func(rev revision) bool f func(rev Revision) bool
wi int wi int
}{ }{
{func(rev revision) bool { return rev.main >= 7 }, 2}, {func(rev Revision) bool { return rev.Main >= 7 }, 2},
{func(rev revision) bool { return rev.main >= 6 }, 1}, {func(rev Revision) bool { return rev.Main >= 6 }, 1},
{func(rev revision) bool { return rev.main >= 5 }, 1}, {func(rev Revision) bool { return rev.Main >= 5 }, 1},
{func(rev revision) bool { return rev.main >= 4 }, 0}, {func(rev Revision) bool { return rev.Main >= 4 }, 0},
{func(rev revision) bool { return rev.main >= 3 }, 0}, {func(rev Revision) bool { return rev.Main >= 3 }, 0},
{func(rev revision) bool { return rev.main >= 2 }, -1}, {func(rev Revision) bool { return rev.Main >= 2 }, -1},
} }
for i, tt := range tests { for i, tt := range tests {
idx := g.walk(tt.f) idx := g.walk(tt.f)

View File

@ -37,15 +37,6 @@ var (
ErrFutureRev = errors.New("mvcc: required revision is a future revision") ErrFutureRev = errors.New("mvcc: required revision is a future revision")
) )
const (
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
)
var restoreChunkKeys = 10000 // non-const for testing var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000 var defaultCompactBatchLimit = 1000
var minimumBatchInterval = 10 * time.Millisecond var minimumBatchInterval = 10 * time.Millisecond
@ -320,9 +311,9 @@ func (s *store) Restore(b backend.Backend) error {
func (s *store) restore() error { func (s *store) restore() error {
s.setupMetricsReporter() s.setupMetricsReporter()
min, max := newRevBytes(), newRevBytes() min, max := NewRevBytes(), NewRevBytes()
revToBytes(revision{main: 1}, min) min = RevToBytes(Revision{Main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) max = RevToBytes(Revision{Main: math.MaxInt64, Sub: math.MaxInt64}, max)
keyToLease := make(map[string]lease.LeaseID) keyToLease := make(map[string]lease.LeaseID)
@ -359,9 +350,9 @@ func (s *store) restore() error {
break break
} }
// next set begins after where this one ended // next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) newMin := BytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++ newMin.Sub++
revToBytes(newMin, min) min = RevToBytes(newMin, min)
} }
close(rkvc) close(rkvc)
@ -448,18 +439,18 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
ok = true ok = true
} }
} }
rev := bytesToRev(rkv.key) rev := BytesToRev(rkv.key)
currentRev = rev.main currentRev = rev.Main
if ok { if ok {
if isTombstone(rkv.key) { if isTombstone(rkv.key) {
if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { if err := ki.tombstone(lg, rev.Main, rev.Sub); err != nil {
lg.Warn("tombstone encountered error", zap.Error(err)) lg.Warn("tombstone encountered error", zap.Error(err))
} }
continue continue
} }
ki.put(lg, rev.main, rev.sub) ki.put(lg, rev.Main, rev.Sub)
} else if !isTombstone(rkv.key) { } else if !isTombstone(rkv.key) {
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version)
idx.Insert(ki) idx.Insert(ki)
kiCache[rkv.kstr] = ki kiCache[rkv.kstr] = ki
} }
@ -519,23 +510,6 @@ func (s *store) setupMetricsReporter() {
reportCompactRevMu.Unlock() reportCompactRevMu.Unlock()
} }
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
if len(b) != revBytesLen {
lg.Panic(
"cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)),
)
}
return append(b, markTombstone)
}
// isTombstone checks whether the revision bytes is a tombstone.
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}
func (s *store) HashStorage() HashStorage { func (s *store) HashStorage() HashStorage {
return s.hashes return s.hashes
} }

View File

@ -44,7 +44,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
h := newKVHasher(prevCompactRev, compactMainRev, keep) h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8) last := make([]byte, 8+1+8)
for { for {
var rev revision var rev Revision
start := time.Now() start := time.Now()
@ -52,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
tx.LockOutsideApply() tx.LockOutsideApply()
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for i := range keys { for i := range keys {
rev = bytesToRev(keys[i]) rev = BytesToRev(keys[i])
if _, ok := keep[rev]; !ok { if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(schema.Key, keys[i]) tx.UnsafeDelete(schema.Key, keys[i])
keyCompactions++ keyCompactions++
@ -77,7 +77,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
tx.Unlock() tx.Unlock()
// update last // update last
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last) last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)
// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
// gofail: var compactBeforeCommitBatch struct{} // gofail: var compactBeforeCommitBatch struct{}
s.b.ForceCommit() s.b.ForceCommit()

View File

@ -29,12 +29,12 @@ import (
) )
func TestScheduleCompaction(t *testing.T) { func TestScheduleCompaction(t *testing.T) {
revs := []revision{{1, 0}, {2, 0}, {3, 0}} revs := []Revision{Revision{Main: 1}, Revision{Main: 2}, Revision{Main: 3}}
tests := []struct { tests := []struct {
rev int64 rev int64
keep map[revision]struct{} keep map[Revision]struct{}
wrevs []revision wrevs []Revision
}{ }{
// compact at 1 and discard all history // compact at 1 and discard all history
{ {
@ -51,17 +51,17 @@ func TestScheduleCompaction(t *testing.T) {
// compact at 1 and keeps history one step earlier // compact at 1 and keeps history one step earlier
{ {
1, 1,
map[revision]struct{}{ map[Revision]struct{}{
{main: 1}: {}, {Main: 1}: {},
}, },
revs, revs,
}, },
// compact at 1 and keeps history two steps earlier // compact at 1 and keeps history two steps earlier
{ {
3, 3,
map[revision]struct{}{ map[Revision]struct{}{
{main: 2}: {}, {Main: 2}: {},
{main: 3}: {}, {Main: 3}: {},
}, },
revs[1:], revs[1:],
}, },
@ -76,9 +76,9 @@ func TestScheduleCompaction(t *testing.T) {
tx := s.b.BatchTx() tx := s.b.BatchTx()
tx.Lock() tx.Lock()
ibytes := newRevBytes() ibytes := NewRevBytes()
for _, rev := range revs { for _, rev := range revs {
revToBytes(rev, ibytes) ibytes = RevToBytes(rev, ibytes)
tx.UnsafePut(schema.Key, ibytes, []byte("bar")) tx.UnsafePut(schema.Key, ibytes, []byte("bar"))
} }
tx.Unlock() tx.Unlock()
@ -90,7 +90,7 @@ func TestScheduleCompaction(t *testing.T) {
tx.Lock() tx.Lock()
for _, rev := range tt.wrevs { for _, rev := range tt.wrevs {
revToBytes(rev, ibytes) ibytes = RevToBytes(rev, ibytes)
keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0) keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0)
if len(keys) != 1 { if len(keys) != 1 {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))

View File

@ -70,22 +70,22 @@ func TestStorePut(t *testing.T) {
} }
tests := []struct { tests := []struct {
rev revision rev Revision
r indexGetResp r indexGetResp
rr *rangeResp rr *rangeResp
wrev revision wrev Revision
wkey []byte wkey []byte
wkv mvccpb.KeyValue wkv mvccpb.KeyValue
wputrev revision wputrev Revision
}{ }{
{ {
revision{1, 0}, Revision{Main: 1},
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, indexGetResp{Revision{}, Revision{}, 0, ErrRevisionNotFound},
nil, nil,
revision{2, 0}, Revision{Main: 2},
newTestKeyBytes(lg, revision{2, 0}, false), newTestRevBytes(Revision{Main: 2}),
mvccpb.KeyValue{ mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -94,15 +94,15 @@ func TestStorePut(t *testing.T) {
Version: 1, Version: 1,
Lease: 1, Lease: 1,
}, },
revision{2, 0}, Revision{Main: 2},
}, },
{ {
revision{1, 1}, Revision{Main: 1, Sub: 1},
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, indexGetResp{Revision{Main: 2}, Revision{Main: 2}, 1, nil},
&rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}}, &rangeResp{[][]byte{newTestRevBytes(Revision{Main: 2, Sub: 1})}, [][]byte{kvb}},
revision{2, 0}, Revision{Main: 2},
newTestKeyBytes(lg, revision{2, 0}, false), newTestRevBytes(Revision{Main: 2}),
mvccpb.KeyValue{ mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -111,15 +111,15 @@ func TestStorePut(t *testing.T) {
Version: 2, Version: 2,
Lease: 2, Lease: 2,
}, },
revision{2, 0}, Revision{Main: 2},
}, },
{ {
revision{2, 0}, Revision{Main: 2},
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, indexGetResp{Revision{Main: 2, Sub: 1}, Revision{Main: 2}, 2, nil},
&rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}}, &rangeResp{[][]byte{newTestRevBytes(Revision{Main: 2, Sub: 1})}, [][]byte{kvb}},
revision{3, 0}, Revision{Main: 3},
newTestKeyBytes(lg, revision{3, 0}, false), newTestRevBytes(Revision{Main: 3}),
mvccpb.KeyValue{ mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -128,7 +128,7 @@ func TestStorePut(t *testing.T) {
Version: 3, Version: 3,
Lease: 3, Lease: 3,
}, },
revision{3, 0}, Revision{Main: 3},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -136,7 +136,7 @@ func TestStorePut(t *testing.T) {
b := s.b.(*fakeBackend) b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex) fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main s.currentRev = tt.rev.Main
fi.indexGetRespc <- tt.r fi.indexGetRespc <- tt.r
if tt.rr != nil { if tt.rr != nil {
b.tx.rangeRespc <- *tt.rr b.tx.rangeRespc <- *tt.rr
@ -163,13 +163,13 @@ func TestStorePut(t *testing.T) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
} }
wact = []testutil.Action{ wact = []testutil.Action{
{Name: "get", Params: []any{[]byte("foo"), tt.wputrev.main}}, {Name: "get", Params: []any{[]byte("foo"), tt.wputrev.Main}},
{Name: "put", Params: []any{[]byte("foo"), tt.wputrev}}, {Name: "put", Params: []any{[]byte("foo"), tt.wputrev}},
} }
if g := fi.Action(); !reflect.DeepEqual(g, wact) { if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
} }
if s.currentRev != tt.wrev.main { if s.currentRev != tt.wrev.Main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
} }
@ -179,7 +179,7 @@ func TestStorePut(t *testing.T) {
func TestStoreRange(t *testing.T) { func TestStoreRange(t *testing.T) {
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false) key := newTestRevBytes(Revision{Main: 2})
kv := mvccpb.KeyValue{ kv := mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -198,11 +198,11 @@ func TestStoreRange(t *testing.T) {
r rangeResp r rangeResp
}{ }{
{ {
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, indexRangeResp{[][]byte{[]byte("foo")}, []Revision{Revision{Main: 2}}},
rangeResp{[][]byte{key}, [][]byte{kvb}}, rangeResp{[][]byte{key}, [][]byte{kvb}},
}, },
{ {
indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []Revision{Revision{Main: 2}, Revision{Main: 3}}},
rangeResp{[][]byte{key}, [][]byte{kvb}}, rangeResp{[][]byte{key}, [][]byte{kvb}},
}, },
} }
@ -228,8 +228,8 @@ func TestStoreRange(t *testing.T) {
t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev) t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
} }
wstart := newRevBytes() wstart := NewRevBytes()
revToBytes(tt.idxr.revs[0], wstart) wstart = RevToBytes(tt.idxr.revs[0], wstart)
wact := []testutil.Action{ wact := []testutil.Action{
{Name: "range", Params: []any{schema.Key, wstart, []byte(nil), int64(0)}}, {Name: "range", Params: []any{schema.Key, wstart, []byte(nil), int64(0)}},
} }
@ -252,7 +252,7 @@ func TestStoreRange(t *testing.T) {
func TestStoreDeleteRange(t *testing.T) { func TestStoreDeleteRange(t *testing.T) {
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false) key := newTestRevBytes(Revision{Main: 2})
kv := mvccpb.KeyValue{ kv := mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -266,24 +266,24 @@ func TestStoreDeleteRange(t *testing.T) {
} }
tests := []struct { tests := []struct {
rev revision rev Revision
r indexRangeResp r indexRangeResp
rr rangeResp rr rangeResp
wkey []byte wkey []byte
wrev revision wrev Revision
wrrev int64 wrrev int64
wdelrev revision wdelrev Revision
}{ }{
{ {
revision{2, 0}, Revision{Main: 2},
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, indexRangeResp{[][]byte{[]byte("foo")}, []Revision{{Main: 2}}},
rangeResp{[][]byte{key}, [][]byte{kvb}}, rangeResp{[][]byte{key}, [][]byte{kvb}},
newTestKeyBytes(lg, revision{3, 0}, true), newTestBucketKeyBytes(newBucketKey(3, 0, true)),
revision{3, 0}, Revision{Main: 3},
2, 2,
revision{3, 0}, Revision{Main: 3},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -291,7 +291,7 @@ func TestStoreDeleteRange(t *testing.T) {
b := s.b.(*fakeBackend) b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex) fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main s.currentRev = tt.rev.Main
fi.indexRangeRespc <- tt.r fi.indexRangeRespc <- tt.r
b.tx.rangeRespc <- tt.rr b.tx.rangeRespc <- tt.rr
@ -319,7 +319,7 @@ func TestStoreDeleteRange(t *testing.T) {
if g := fi.Action(); !reflect.DeepEqual(g, wact) { if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
} }
if s.currentRev != tt.wrev.main { if s.currentRev != tt.wrev.Main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
} }
s.Close() s.Close()
@ -334,9 +334,9 @@ func TestStoreCompact(t *testing.T) {
fi := s.kvindex.(*fakeIndex) fi := s.kvindex.(*fakeIndex)
s.currentRev = 3 s.currentRev = 3
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} fi.indexCompactRespc <- map[Revision]struct{}{Revision{Main: 1}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false) key1 := newTestRevBytes(Revision{Main: 1})
key2 := newTestKeyBytes(lg, revision{2, 0}, false) key2 := newTestRevBytes(Revision{Main: 2})
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
@ -352,10 +352,10 @@ func TestStoreCompact(t *testing.T) {
wact := []testutil.Action{ wact := []testutil.Action{
{Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}}, {Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}}, {Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "put", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(Revision{Main: 3})}},
{Name: "range", Params: []any{schema.Key, make([]byte, 17), end, int64(10000)}}, {Name: "range", Params: []any{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []any{schema.Key, key2}}, {Name: "delete", Params: []any{schema.Key, key2}},
{Name: "put", Params: []any{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "put", Params: []any{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(Revision{Main: 3})}},
} }
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact) t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -375,7 +375,7 @@ func TestStoreRestore(t *testing.T) {
fi := s.kvindex.(*fakeIndex) fi := s.kvindex.(*fakeIndex)
defer s.Close() defer s.Close()
putkey := newTestKeyBytes(lg, revision{3, 0}, false) putkey := newTestRevBytes(Revision{Main: 3})
putkv := mvccpb.KeyValue{ putkv := mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
Value: []byte("bar"), Value: []byte("bar"),
@ -387,7 +387,7 @@ func TestStoreRestore(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delkey := newTestKeyBytes(lg, revision{5, 0}, true) delkey := newTestBucketKeyBytes(newBucketKey(5, 0, true))
delkv := mvccpb.KeyValue{ delkv := mvccpb.KeyValue{
Key: []byte("foo"), Key: []byte("foo"),
} }
@ -395,8 +395,8 @@ func TestStoreRestore(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(Revision{Main: 3})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(Revision{Main: 3})}}
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil} b.tx.rangeRespc <- rangeResp{nil, nil}
@ -412,17 +412,17 @@ func TestStoreRestore(t *testing.T) {
wact := []testutil.Action{ wact := []testutil.Action{
{Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []any{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []any{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []any{schema.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, {Name: "range", Params: []any{schema.Key, newTestRevBytes(Revision{Main: 1}), newTestRevBytes(Revision{Main: math.MaxInt64, Sub: math.MaxInt64}), int64(restoreChunkKeys)}},
} }
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact) t.Errorf("tx actions = %+v, want %+v", g, wact)
} }
gens := []generation{ gens := []generation{
{created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}}, {created: Revision{Main: 4}, ver: 2, revs: []Revision{Revision{Main: 3}, Revision{Main: 5}}},
{created: revision{0, 0}, ver: 0, revs: nil}, {created: Revision{Main: 0}, ver: 0, revs: nil},
} }
ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} ki := &keyIndex{key: []byte("foo"), modified: Revision{Main: 5}, generations: gens}
wact = []testutil.Action{ wact = []testutil.Action{
{Name: "keyIndex", Params: []any{ki}}, {Name: "keyIndex", Params: []any{ki}},
{Name: "insert", Params: []any{ki}}, {Name: "insert", Params: []any{ki}},
@ -496,8 +496,6 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
// write scheduled compaction, but not do compaction // write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx() tx := s0.b.BatchTx()
tx.Lock() tx.Lock()
UnsafeSetScheduledCompact(tx, 2) UnsafeSetScheduledCompact(tx, 2)
@ -524,8 +522,8 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
} }
// check the key in backend is deleted // check the key in backend is deleted
revbytes := newRevBytes() revbytes := NewRevBytes()
revToBytes(revision{main: 1}, revbytes) revbytes = BucketKeyToBytes(newBucketKey(1, 0, false), revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk. // The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO. // try 5 times for CI with slow IO.
@ -540,7 +538,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
} }
return return
} }
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) t.Errorf("key for rev %+v still exists, want deleted", BytesToBucketKey(revbytes))
}) })
} }
} }
@ -605,7 +603,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
} }
}() }()
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished // Compact the store in a goroutine, using RevisionTombstone 9900 to 10000 and close donec when finished
go func() { go func() {
defer close(donec) defer close(donec)
for i := 100; i >= 0; i-- { for i := 100; i >= 0; i-- {
@ -629,7 +627,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
} }
// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called // TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
// with a past revision (lower than compacted), a future revision, and the exact compacted revision // with a past RevisionTombstone (lower than compacted), a future RevisionTombstone, and the exact compacted RevisionTombstone
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t) b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
@ -662,7 +660,7 @@ func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
} }
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes // TestHashKVZeroRevision ensures that "HashByRev(0)" computes
// correct hash value with latest revision. // correct hash value with latest RevisionTombstone.
func TestHashKVZeroRevision(t *testing.T) { func TestHashKVZeroRevision(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t) b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
@ -882,19 +880,14 @@ func merge(dst, src kvs) kvs {
// TODO: test attach key to lessor // TODO: test attach key to lessor
func newTestRevBytes(rev revision) []byte { func newTestRevBytes(rev Revision) []byte {
bytes := newRevBytes() bytes := NewRevBytes()
revToBytes(rev, bytes) return RevToBytes(rev, bytes)
return bytes
} }
func newTestKeyBytes(lg *zap.Logger, rev revision, tombstone bool) []byte { func newTestBucketKeyBytes(rev BucketKey) []byte {
bytes := newRevBytes() bytes := NewRevBytes()
revToBytes(rev, bytes) return BucketKeyToBytes(rev, bytes)
if tombstone {
bytes = appendMarkTombstone(lg, bytes)
}
return bytes
} }
func newFakeStore(lg *zap.Logger) *store { func newFakeStore(lg *zap.Logger) *store {
@ -926,7 +919,7 @@ func newFakeIndex() *fakeIndex {
indexGetRespc: make(chan indexGetResp, 1), indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1), indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1), indexCompactRespc: make(chan map[Revision]struct{}, 1),
} }
} }
@ -986,19 +979,19 @@ func (b *fakeBackend) Close() error
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
type indexGetResp struct { type indexGetResp struct {
rev revision rev Revision
created revision created Revision
ver int64 ver int64
err error err error
} }
type indexRangeResp struct { type indexRangeResp struct {
keys [][]byte keys [][]byte
revs []revision revs []Revision
} }
type indexRangeEventsResp struct { type indexRangeEventsResp struct {
revs []revision revs []Revision
} }
type fakeIndex struct { type fakeIndex struct {
@ -1006,10 +999,10 @@ type fakeIndex struct {
indexGetRespc chan indexGetResp indexGetRespc chan indexGetResp
indexRangeRespc chan indexRangeResp indexRangeRespc chan indexRangeResp
indexRangeEventsRespc chan indexRangeEventsResp indexRangeEventsRespc chan indexRangeEventsResp
indexCompactRespc chan map[revision]struct{} indexCompactRespc chan map[Revision]struct{}
} }
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) { func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]Revision, int) {
_, rev := i.Range(key, end, atRev) _, rev := i.Range(key, end, atRev)
if len(rev) >= limit { if len(rev) >= limit {
rev = rev[:limit] rev = rev[:limit]
@ -1022,33 +1015,33 @@ func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
return len(rev) return len(rev)
} }
func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created Revision, ver int64, err error) {
i.Recorder.Record(testutil.Action{Name: "get", Params: []any{key, atRev}}) i.Recorder.Record(testutil.Action{Name: "get", Params: []any{key, atRev}})
r := <-i.indexGetRespc r := <-i.indexGetRespc
return r.rev, r.created, r.ver, r.err return r.rev, r.created, r.ver, r.err
} }
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) { func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []Revision) {
i.Recorder.Record(testutil.Action{Name: "range", Params: []any{key, end, atRev}}) i.Recorder.Record(testutil.Action{Name: "range", Params: []any{key, end, atRev}})
r := <-i.indexRangeRespc r := <-i.indexRangeRespc
return r.keys, r.revs return r.keys, r.revs
} }
func (i *fakeIndex) Put(key []byte, rev revision) { func (i *fakeIndex) Put(key []byte, rev Revision) {
i.Recorder.Record(testutil.Action{Name: "put", Params: []any{key, rev}}) i.Recorder.Record(testutil.Action{Name: "put", Params: []any{key, rev}})
} }
func (i *fakeIndex) Tombstone(key []byte, rev revision) error { func (i *fakeIndex) Tombstone(key []byte, rev Revision) error {
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []any{key, rev}}) i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []any{key, rev}})
return nil return nil
} }
func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision { func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []Revision {
i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []any{key, end, rev}}) i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []any{key, end, rev}})
r := <-i.indexRangeEventsRespc r := <-i.indexRangeEventsRespc
return r.revs return r.revs
} }
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { func (i *fakeIndex) Compact(rev int64) map[Revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []any{rev}}) i.Recorder.Record(testutil.Action{Name: "compact", Params: []any{rev}})
return <-i.indexCompactRespc return <-i.indexCompactRespc
} }
func (i *fakeIndex) Keep(rev int64) map[revision]struct{} { func (i *fakeIndex) Keep(rev int64) map[Revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "keep", Params: []any{rev}}) i.Recorder.Record(testutil.Action{Name: "keep", Params: []any{rev}})
return <-i.indexCompactRespc return <-i.indexCompactRespc
} }

View File

@ -97,20 +97,20 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev
} }
kvs := make([]mvccpb.KeyValue, limit) kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes() revBytes := NewRevBytes()
for i, revpair := range revpairs[:len(kvs)] { for i, revpair := range revpairs[:len(kvs)] {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err()) return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
default: default:
} }
revToBytes(revpair, revBytes) revBytes = RevToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0) _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
if len(vs) != 1 { if len(vs) != 1 {
tr.s.lg.Fatal( tr.s.lg.Fatal(
"range failed to find revision pair", "range failed to find revision pair",
zap.Int64("revision-main", revpair.main), zap.Int64("revision-main", revpair.Main),
zap.Int64("revision-sub", revpair.sub), zap.Int64("revision-sub", revpair.Sub),
zap.Int64("revision-current", curRev), zap.Int64("revision-current", curRev),
zap.Int64("range-option-rev", ro.Rev), zap.Int64("range-option-rev", ro.Rev),
zap.Int64("range-option-limit", ro.Limit), zap.Int64("range-option-limit", ro.Limit),
@ -202,13 +202,13 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
// get its previous leaseID // get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev) _, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil { if err == nil {
c = created.main c = created.Main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
tw.trace.Step("get key's previous created_revision and leaseID") tw.trace.Step("get key's previous created_revision and leaseID")
} }
ibytes := newRevBytes() ibytes := NewRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))} idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes) ibytes = RevToBytes(idxRev, ibytes)
ver = ver + 1 ver = ver + 1
kv := mvccpb.KeyValue{ kv := mvccpb.KeyValue{
@ -279,11 +279,9 @@ func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
} }
func (tw *storeTxnWrite) delete(key []byte) { func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes() ibytes := NewRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} idxRev := newBucketKey(tw.beginRev+1, int64(len(tw.changes)), true)
revToBytes(idxRev, ibytes) ibytes = BucketKeyToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnCommon.s.lg, ibytes)
kv := mvccpb.KeyValue{Key: key} kv := mvccpb.KeyValue{Key: key}
@ -296,7 +294,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
} }
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d) tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev) err = tw.s.kvindex.Tombstone(key, idxRev.Revision)
if err != nil { if err != nil {
tw.storeTxnCommon.s.lg.Fatal( tw.storeTxnCommon.s.lg.Fatal(
"failed to tombstone an existing key", "failed to tombstone an existing key",

View File

@ -14,48 +14,97 @@
package mvcc package mvcc
import "encoding/binary" import (
"encoding/binary"
)
// revBytesLen is the byte length of a normal revision. const (
// First 8 bytes is the revision.main in big-endian format. The 9th byte // revBytesLen is the byte length of a normal revision.
// is a '_'. The last 8 bytes is the revision.sub in big-endian format. // First 8 bytes is the revision.main in big-endian format. The 9th byte
const revBytesLen = 8 + 1 + 8 // is a '_'. The last 8 bytes is the revision.sub in big-endian format.
revBytesLen = 8 + 1 + 8
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
)
// A revision indicates modification of the key-value space. type Revision struct {
// The set of changes that share same main revision changes the key-value space atomically. // Main is the main revision of a set of changes that happen atomically.
type revision struct { Main int64
// main is the main revision of a set of changes that happen atomically. // Sub is the sub revision of a change in a set of changes that happen
main int64
// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that // atomically. Each change has different increasing sub revision in that
// set. // set.
sub int64 Sub int64
} }
func (a revision) GreaterThan(b revision) bool { func (a Revision) GreaterThan(b Revision) bool {
if a.main > b.main { if a.Main > b.Main {
return true return true
} }
if a.main < b.main { if a.Main < b.Main {
return false return false
} }
return a.sub > b.sub return a.Sub > b.Sub
} }
func newRevBytes() []byte { func RevToBytes(rev Revision, bytes []byte) []byte {
return BucketKeyToBytes(newBucketKey(rev.Main, rev.Sub, false), bytes)
}
func BytesToRev(bytes []byte) Revision {
return BytesToBucketKey(bytes).Revision
}
// BucketKey indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type BucketKey struct {
Revision
tombstone bool
}
func newBucketKey(main, sub int64, isTombstone bool) BucketKey {
return BucketKey{
Revision: Revision{
Main: main,
Sub: sub,
},
tombstone: isTombstone,
}
}
func NewRevBytes() []byte {
return make([]byte, revBytesLen, markedRevBytesLen) return make([]byte, revBytesLen, markedRevBytesLen)
} }
func revToBytes(rev revision, bytes []byte) { func BucketKeyToBytes(rev BucketKey, bytes []byte) []byte {
binary.BigEndian.PutUint64(bytes, uint64(rev.main)) binary.BigEndian.PutUint64(bytes, uint64(rev.Main))
bytes[8] = '_' bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) binary.BigEndian.PutUint64(bytes[9:], uint64(rev.Sub))
if rev.tombstone {
switch len(bytes) {
case revBytesLen:
bytes = append(bytes, markTombstone)
case markedRevBytesLen:
bytes[markBytePosition] = markTombstone
}
}
return bytes
} }
func bytesToRev(bytes []byte) revision { func BytesToBucketKey(bytes []byte) BucketKey {
return revision{ return BucketKey{
main: int64(binary.BigEndian.Uint64(bytes[0:8])), Revision: Revision{
sub: int64(binary.BigEndian.Uint64(bytes[9:])), Main: int64(binary.BigEndian.Uint64(bytes[0:8])),
Sub: int64(binary.BigEndian.Uint64(bytes[9:])),
},
tombstone: isTombstone(bytes),
} }
} }
// isTombstone checks whether the revision bytes is a tombstone.
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}

View File

@ -1,53 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"math"
"reflect"
"testing"
)
// TestRevision tests that revision could be encoded to and decoded from
// bytes slice. Moreover, the lexicographical order of its byte slice representation
// follows the order of (main, sub).
func TestRevision(t *testing.T) {
tests := []revision{
// order in (main, sub)
{},
{main: 1, sub: 0},
{main: 1, sub: 1},
{main: 2, sub: 0},
{main: math.MaxInt64, sub: math.MaxInt64},
}
bs := make([][]byte, len(tests))
for i, tt := range tests {
b := newRevBytes()
revToBytes(tt, b)
bs[i] = b
if grev := bytesToRev(b); !reflect.DeepEqual(grev, tt) {
t.Errorf("#%d: revision = %+v, want %+v", i, grev, tt)
}
}
for i := 0; i < len(tests)-1; i++ {
if bytes.Compare(bs[i], bs[i+1]) >= 0 {
t.Errorf("#%d: %v (%+v) should be smaller than %v (%+v)", i, bs[i], tests[i], bs[i+1], tests[i+1])
}
}
}

View File

@ -22,7 +22,7 @@ import (
func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) { func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0) _, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 { if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true return BytesToRev(finishedCompactBytes[0]).Main, true
} }
return 0, false return 0, false
} }
@ -30,7 +30,7 @@ func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, f
func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) { func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0) _, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 { if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true return BytesToRev(scheduledCompactBytes[0]).Main, true
} }
return 0, false return 0, false
} }
@ -42,8 +42,8 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) {
} }
func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) { func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes() rbytes := NewRevBytes()
revToBytes(revision{main: value}, rbytes) rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes) tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
} }
@ -54,7 +54,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) {
} }
func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) { func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes() rbytes := NewRevBytes()
revToBytes(revision{main: value}, rbytes) rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes) tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
} }

View File

@ -352,9 +352,9 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes() minBytes, maxBytes := NewRevBytes(), NewRevBytes()
revToBytes(revision{main: minRev}, minBytes) minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes) maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions. // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend. // values are actual key-value pairs in backend.
@ -428,7 +428,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
if isTombstone(revs[i]) { if isTombstone(revs[i]) {
ty = mvccpb.DELETE ty = mvccpb.DELETE
// patch in mod revision so watchers won't skip // patch in mod revision so watchers won't skip
kv.ModRevision = bytesToRev(revs[i]).main kv.ModRevision = BytesToRev(revs[i]).Main
} }
evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
} }