mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Cache compaction hash for HashByRev API
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
2b090e86a6
commit
0e739da9a4
@ -251,6 +251,10 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6
|
|||||||
return hashByRev.hash, hashByRev.revision, hashByRev.err
|
return hashByRev.hash, hashByRev.revision, hashByRev.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeHasher) ReqTimeout() time.Duration {
|
func (f *fakeHasher) ReqTimeout() time.Duration {
|
||||||
f.actions = append(f.actions, "ReqTimeout()")
|
f.actions = append(f.actions, "ReqTimeout()")
|
||||||
return time.Second
|
return time.Second
|
||||||
|
@ -17,9 +17,16 @@ package mvcc
|
|||||||
import (
|
import (
|
||||||
"hash"
|
"hash"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
hashStorageMaxSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
||||||
@ -83,15 +90,22 @@ type HashStorage interface {
|
|||||||
|
|
||||||
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
||||||
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
|
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
|
||||||
|
|
||||||
|
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
|
||||||
|
Store(valueHash KeyValueHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashStorage struct {
|
type hashStorage struct {
|
||||||
store *store
|
store *store
|
||||||
|
hashMu sync.RWMutex
|
||||||
|
hashes []KeyValueHash
|
||||||
|
lg *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHashStorage(s *store) HashStorage {
|
func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
|
||||||
return &hashStorage{
|
return &hashStorage{
|
||||||
store: s,
|
store: s,
|
||||||
|
lg: lg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,5 +114,35 @@ func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
|
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
|
||||||
|
s.hashMu.RLock()
|
||||||
|
for _, h := range s.hashes {
|
||||||
|
if rev == h.Revision {
|
||||||
|
s.hashMu.RUnlock()
|
||||||
|
|
||||||
|
s.store.revMu.RLock()
|
||||||
|
currentRev := s.store.currentRev
|
||||||
|
s.store.revMu.RUnlock()
|
||||||
|
return h, currentRev, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.hashMu.RUnlock()
|
||||||
|
|
||||||
return s.store.hashByRev(rev)
|
return s.store.hashByRev(rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *hashStorage) Store(hash KeyValueHash) {
|
||||||
|
s.lg.Info("storing new hash",
|
||||||
|
zap.Uint32("hash", hash.Hash),
|
||||||
|
zap.Int64("revision", hash.Revision),
|
||||||
|
zap.Int64("compact-revision", hash.CompactRevision),
|
||||||
|
)
|
||||||
|
s.hashMu.Lock()
|
||||||
|
defer s.hashMu.Unlock()
|
||||||
|
s.hashes = append(s.hashes, hash)
|
||||||
|
sort.Slice(s.hashes, func(i, j int) bool {
|
||||||
|
return s.hashes[i].Revision < s.hashes[j].Revision
|
||||||
|
})
|
||||||
|
if len(s.hashes) > hashStorageMaxSize {
|
||||||
|
s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -187,3 +187,53 @@ func pickKey(i int64) string {
|
|||||||
panic("Can't count")
|
panic("Can't count")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHasherStore(t *testing.T) {
|
||||||
|
lg := zaptest.NewLogger(t)
|
||||||
|
s := newHashStorage(lg, newFakeStore(lg))
|
||||||
|
var hashes []KeyValueHash
|
||||||
|
for i := 0; i < hashStorageMaxSize; i++ {
|
||||||
|
hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100}
|
||||||
|
hashes = append(hashes, hash)
|
||||||
|
s.Store(hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, want := range hashes {
|
||||||
|
got, _, err := s.HashByRev(want.Revision)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if want.Hash != got.Hash {
|
||||||
|
t.Errorf("Expected stored hash to match, got: %d, expected: %d", want.Hash, got.Hash)
|
||||||
|
}
|
||||||
|
if want.Revision != got.Revision {
|
||||||
|
t.Errorf("Expected stored revision to match, got: %d, expected: %d", want.Revision, got.Revision)
|
||||||
|
}
|
||||||
|
if want.CompactRevision != got.CompactRevision {
|
||||||
|
t.Errorf("Expected stored compact revision to match, got: %d, expected: %d", want.CompactRevision, got.CompactRevision)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHasherStoreFull(t *testing.T) {
|
||||||
|
lg := zaptest.NewLogger(t)
|
||||||
|
s := newHashStorage(lg, newFakeStore(lg))
|
||||||
|
var minRevision int64 = 100
|
||||||
|
var maxRevision = minRevision + hashStorageMaxSize
|
||||||
|
for i := 0; i < hashStorageMaxSize; i++ {
|
||||||
|
s.Store(KeyValueHash{Revision: int64(i) + minRevision})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash for old revision should be discarded as storage is already full
|
||||||
|
s.Store(KeyValueHash{Revision: minRevision - 1})
|
||||||
|
hash, _, err := s.HashByRev(minRevision - 1)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Expected an error as old revision should be discarded, got: %v", hash)
|
||||||
|
}
|
||||||
|
// Hash for new revision should be stored even when storage is full
|
||||||
|
s.Store(KeyValueHash{Revision: maxRevision + 1})
|
||||||
|
_, _, err = s.HashByRev(maxRevision + 1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't expect error for new revision, err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -114,7 +114,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
|||||||
|
|
||||||
lg: lg,
|
lg: lg,
|
||||||
}
|
}
|
||||||
s.hashes = newHashStorage(s)
|
s.hashes = newHashStorage(lg, s)
|
||||||
s.ReadView = &readView{s}
|
s.ReadView = &readView{s}
|
||||||
s.WriteView = &writeView{s}
|
s.WriteView = &writeView{s}
|
||||||
if s.le != nil {
|
if s.le != nil {
|
||||||
@ -230,11 +230,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
|||||||
s.compactBarrier(ctx, ch)
|
s.compactBarrier(ctx, ch)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err := s.scheduleCompaction(rev, prevCompactRev); err != nil {
|
hash, err := s.scheduleCompaction(rev, prevCompactRev)
|
||||||
|
if err != nil {
|
||||||
s.lg.Warn("Failed compaction", zap.Error(err))
|
s.lg.Warn("Failed compaction", zap.Error(err))
|
||||||
s.compactBarrier(context.TODO(), ch)
|
s.compactBarrier(context.TODO(), ch)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
s.hashes.Store(hash)
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -861,6 +861,7 @@ func newFakeStore(lg *zap.Logger) *store {
|
|||||||
lg: lg,
|
lg: lg,
|
||||||
}
|
}
|
||||||
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
|
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
|
||||||
|
s.hashes = newHashStorage(lg, s)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user