server: Cache compaction hash for HashByRev API

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz
2022-06-07 15:11:57 +02:00
parent 7358362c99
commit 1200b1006d
5 changed files with 105 additions and 4 deletions

View File

@@ -251,6 +251,10 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6
return hashByRev.hash, hashByRev.revision, hashByRev.err return hashByRev.hash, hashByRev.revision, hashByRev.err
} }
func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
panic("not implemented")
}
func (f *fakeHasher) ReqTimeout() time.Duration { func (f *fakeHasher) ReqTimeout() time.Duration {
f.actions = append(f.actions, "ReqTimeout()") f.actions = append(f.actions, "ReqTimeout()")
return time.Second return time.Second

View File

@@ -17,9 +17,16 @@ package mvcc
import ( import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"sort"
"sync"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
const (
hashStorageMaxSize = 10
) )
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
@@ -83,15 +90,22 @@ type HashStorage interface {
// HashByRev computes the hash of all MVCC revisions up to a given revision. // HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
Store(valueHash KeyValueHash)
} }
type hashStorage struct { type hashStorage struct {
store *store store *store
hashMu sync.RWMutex
hashes []KeyValueHash
lg *zap.Logger
} }
func newHashStorage(s *store) HashStorage { func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
return &hashStorage{ return &hashStorage{
store: s, store: s,
lg: lg,
} }
} }
@@ -100,5 +114,35 @@ func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
} }
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) { func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
s.hashMu.RLock()
for _, h := range s.hashes {
if rev == h.Revision {
s.hashMu.RUnlock()
s.store.revMu.RLock()
currentRev := s.store.currentRev
s.store.revMu.RUnlock()
return h, currentRev, nil
}
}
s.hashMu.RUnlock()
return s.store.hashByRev(rev) return s.store.hashByRev(rev)
} }
func (s *hashStorage) Store(hash KeyValueHash) {
s.lg.Info("storing new hash",
zap.Uint32("hash", hash.Hash),
zap.Int64("revision", hash.Revision),
zap.Int64("compact-revision", hash.CompactRevision),
)
s.hashMu.Lock()
defer s.hashMu.Unlock()
s.hashes = append(s.hashes, hash)
sort.Slice(s.hashes, func(i, j int) bool {
return s.hashes[i].Revision < s.hashes[j].Revision
})
if len(s.hashes) > hashStorageMaxSize {
s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
}
}

View File

@@ -187,3 +187,53 @@ func pickKey(i int64) string {
panic("Can't count") panic("Can't count")
} }
} }
func TestHasherStore(t *testing.T) {
lg := zaptest.NewLogger(t)
s := newHashStorage(lg, newFakeStore())
var hashes []KeyValueHash
for i := 0; i < hashStorageMaxSize; i++ {
hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100}
hashes = append(hashes, hash)
s.Store(hash)
}
for _, want := range hashes {
got, _, err := s.HashByRev(want.Revision)
if err != nil {
t.Fatal(err)
}
if want.Hash != got.Hash {
t.Errorf("Expected stored hash to match, got: %d, expected: %d", want.Hash, got.Hash)
}
if want.Revision != got.Revision {
t.Errorf("Expected stored revision to match, got: %d, expected: %d", want.Revision, got.Revision)
}
if want.CompactRevision != got.CompactRevision {
t.Errorf("Expected stored compact revision to match, got: %d, expected: %d", want.CompactRevision, got.CompactRevision)
}
}
}
func TestHasherStoreFull(t *testing.T) {
lg := zaptest.NewLogger(t)
s := newHashStorage(lg, newFakeStore())
var minRevision int64 = 100
var maxRevision = minRevision + hashStorageMaxSize
for i := 0; i < hashStorageMaxSize; i++ {
s.Store(KeyValueHash{Revision: int64(i) + minRevision})
}
// Hash for old revision should be discarded as storage is already full
s.Store(KeyValueHash{Revision: minRevision - 1})
hash, _, err := s.HashByRev(minRevision - 1)
if err == nil {
t.Errorf("Expected an error as old revision should be discarded, got: %v", hash)
}
// Hash for new revision should be stored even when storage is full
s.Store(KeyValueHash{Revision: maxRevision + 1})
_, _, err = s.HashByRev(maxRevision + 1)
if err != nil {
t.Errorf("Didn't expect error for new revision, err: %v", err)
}
}

View File

@@ -112,7 +112,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
lg: lg, lg: lg,
} }
s.hashes = newHashStorage(s) s.hashes = newHashStorage(lg, s)
s.ReadView = &readView{s} s.ReadView = &readView{s}
s.WriteView = &writeView{s} s.WriteView = &writeView{s}
if s.le != nil { if s.le != nil {
@@ -234,11 +234,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(ctx, ch) s.compactBarrier(ctx, ch)
return return
} }
if _, err := s.scheduleCompaction(rev, prevCompactRev); err != nil { hash, err := s.scheduleCompaction(rev, prevCompactRev)
if err != nil {
s.lg.Warn("Failed compaction", zap.Error(err)) s.lg.Warn("Failed compaction", zap.Error(err))
s.compactBarrier(context.TODO(), ch) s.compactBarrier(context.TODO(), ch)
return return
} }
s.hashes.Store(hash)
close(ch) close(ch)
} }

View File

@@ -851,6 +851,7 @@ func newFakeStore() *store {
lg: zap.NewExample(), lg: zap.NewExample(),
} }
s.ReadView, s.WriteView = &readView{s}, &writeView{s} s.ReadView, s.WriteView = &readView{s}, &writeView{s}
s.hashes = newHashStorage(zap.NewExample(), s)
return s return s
} }