diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 8cfe96c99..d2b976f74 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -251,6 +251,10 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6 return hashByRev.hash, hashByRev.revision, hashByRev.err } +func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) { + panic("not implemented") +} + func (f *fakeHasher) ReqTimeout() time.Duration { f.actions = append(f.actions, "ReqTimeout()") return time.Second diff --git a/server/storage/mvcc/hash.go b/server/storage/mvcc/hash.go index 9e63224c6..696ddd216 100644 --- a/server/storage/mvcc/hash.go +++ b/server/storage/mvcc/hash.go @@ -17,9 +17,16 @@ package mvcc import ( "hash" "hash/crc32" + "sort" + "sync" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" + "go.uber.org/zap" +) + +const ( + hashStorageMaxSize = 10 ) func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { @@ -83,15 +90,22 @@ type HashStorage interface { // HashByRev computes the hash of all MVCC revisions up to a given revision. HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) + + // Store adds hash value in local cache, allowing it can be returned by HashByRev. + Store(valueHash KeyValueHash) } type hashStorage struct { - store *store + store *store + hashMu sync.RWMutex + hashes []KeyValueHash + lg *zap.Logger } -func newHashStorage(s *store) HashStorage { +func newHashStorage(lg *zap.Logger, s *store) *hashStorage { return &hashStorage{ store: s, + lg: lg, } } @@ -100,5 +114,35 @@ func (s *hashStorage) Hash() (hash uint32, revision int64, err error) { } func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) { + s.hashMu.RLock() + for _, h := range s.hashes { + if rev == h.Revision { + s.hashMu.RUnlock() + + s.store.revMu.RLock() + currentRev := s.store.currentRev + s.store.revMu.RUnlock() + return h, currentRev, nil + } + } + s.hashMu.RUnlock() + return s.store.hashByRev(rev) } + +func (s *hashStorage) Store(hash KeyValueHash) { + s.lg.Info("storing new hash", + zap.Uint32("hash", hash.Hash), + zap.Int64("revision", hash.Revision), + zap.Int64("compact-revision", hash.CompactRevision), + ) + s.hashMu.Lock() + defer s.hashMu.Unlock() + s.hashes = append(s.hashes, hash) + sort.Slice(s.hashes, func(i, j int) bool { + return s.hashes[i].Revision < s.hashes[j].Revision + }) + if len(s.hashes) > hashStorageMaxSize { + s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:] + } +} diff --git a/server/storage/mvcc/hash_test.go b/server/storage/mvcc/hash_test.go index 179e8d6e6..2e0b0b58c 100644 --- a/server/storage/mvcc/hash_test.go +++ b/server/storage/mvcc/hash_test.go @@ -187,3 +187,53 @@ func pickKey(i int64) string { panic("Can't count") } } + +func TestHasherStore(t *testing.T) { + lg := zaptest.NewLogger(t) + s := newHashStorage(lg, newFakeStore(lg)) + var hashes []KeyValueHash + for i := 0; i < hashStorageMaxSize; i++ { + hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100} + hashes = append(hashes, hash) + s.Store(hash) + } + + for _, want := range hashes { + got, _, err := s.HashByRev(want.Revision) + if err != nil { + t.Fatal(err) + } + if want.Hash != got.Hash { + t.Errorf("Expected stored hash to match, got: %d, expected: %d", want.Hash, got.Hash) + } + if want.Revision != got.Revision { + t.Errorf("Expected stored revision to match, got: %d, expected: %d", want.Revision, got.Revision) + } + if want.CompactRevision != got.CompactRevision { + t.Errorf("Expected stored compact revision to match, got: %d, expected: %d", want.CompactRevision, got.CompactRevision) + } + } +} + +func TestHasherStoreFull(t *testing.T) { + lg := zaptest.NewLogger(t) + s := newHashStorage(lg, newFakeStore(lg)) + var minRevision int64 = 100 + var maxRevision = minRevision + hashStorageMaxSize + for i := 0; i < hashStorageMaxSize; i++ { + s.Store(KeyValueHash{Revision: int64(i) + minRevision}) + } + + // Hash for old revision should be discarded as storage is already full + s.Store(KeyValueHash{Revision: minRevision - 1}) + hash, _, err := s.HashByRev(minRevision - 1) + if err == nil { + t.Errorf("Expected an error as old revision should be discarded, got: %v", hash) + } + // Hash for new revision should be stored even when storage is full + s.Store(KeyValueHash{Revision: maxRevision + 1}) + _, _, err = s.HashByRev(maxRevision + 1) + if err != nil { + t.Errorf("Didn't expect error for new revision, err: %v", err) + } +} diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index bbb3b74cd..c3da906b7 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -114,7 +114,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi lg: lg, } - s.hashes = newHashStorage(s) + s.hashes = newHashStorage(lg, s) s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { @@ -230,11 +230,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch s.compactBarrier(ctx, ch) return } - if _, err := s.scheduleCompaction(rev, prevCompactRev); err != nil { + hash, err := s.scheduleCompaction(rev, prevCompactRev) + if err != nil { s.lg.Warn("Failed compaction", zap.Error(err)) s.compactBarrier(context.TODO(), ch) return } + s.hashes.Store(hash) close(ch) } diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 10fdf0995..099bad58e 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -861,6 +861,7 @@ func newFakeStore(lg *zap.Logger) *store { lg: lg, } s.ReadView, s.WriteView = &readView{s}, &writeView{s} + s.hashes = newHashStorage(lg, s) return s }