mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Before this patch, the tombstone can be deleted if its revision is equal compacted revision. It causes that the watch subscriber won't get this DELETE event. Based on Compact API[1], we should keep tombstone revision if it's not less than the compaction revision. > CompactionRequest compacts the key-value store up to a given revision. > All superseded keys with a revision less than the compaction revision > will be removed. [1]: https://etcd.io/docs/latest/dev-guide/api_reference_v3/ Signed-off-by: Wei Fu <fuweid89@gmail.com> (cherry picked from commit bbdc94181a6d67904b575ad936c20d1be10e220c) Signed-off-by: Wei Fu <fuweid89@gmail.com>
177 lines
4.5 KiB
Go
177 lines
4.5 KiB
Go
// Copyright 2022 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mvcc
|
|
|
|
import (
|
|
"hash"
|
|
"hash/crc32"
|
|
"sort"
|
|
"sync"
|
|
|
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
|
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
hashStorageMaxSize = 10
|
|
)
|
|
|
|
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
|
h := newKVHasher(compactRevision, revision, keep)
|
|
err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
|
|
h.WriteKeyValue(k, v)
|
|
return nil
|
|
})
|
|
return h.Hash(), err
|
|
}
|
|
|
|
type kvHasher struct {
|
|
hash hash.Hash32
|
|
compactRevision int64
|
|
revision int64
|
|
keep map[revision]struct{}
|
|
}
|
|
|
|
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
|
|
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
|
h.Write(buckets.Key.Name())
|
|
return kvHasher{
|
|
hash: h,
|
|
compactRevision: compactRev,
|
|
revision: rev,
|
|
keep: keep,
|
|
}
|
|
}
|
|
|
|
func (h *kvHasher) WriteKeyValue(k, v []byte) {
|
|
kr := bytesToRev(k)
|
|
upper := revision{main: h.revision + 1}
|
|
if !upper.GreaterThan(kr) {
|
|
return
|
|
}
|
|
|
|
isTombstoneRev := isTombstone(k)
|
|
|
|
lower := revision{main: h.compactRevision + 1}
|
|
// skip revisions that are scheduled for deletion
|
|
// due to compacting; don't skip if there isn't one.
|
|
if lower.GreaterThan(kr) && len(h.keep) > 0 {
|
|
if _, ok := h.keep[kr]; !ok {
|
|
return
|
|
}
|
|
}
|
|
|
|
// When performing compaction, if the compacted revision is a
|
|
// tombstone, older versions (<= 3.5.15 or <= 3.4.33) will delete
|
|
// the tombstone. But newer versions (> 3.5.15 or > 3.4.33) won't
|
|
// delete it. So we should skip the tombstone in such cases when
|
|
// computing the hash to ensure that both older and newer versions
|
|
// can always generate the same hash values.
|
|
if kr.main == h.compactRevision && isTombstoneRev {
|
|
return
|
|
}
|
|
|
|
h.hash.Write(k)
|
|
h.hash.Write(v)
|
|
}
|
|
|
|
func (h *kvHasher) Hash() KeyValueHash {
|
|
return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision}
|
|
}
|
|
|
|
type KeyValueHash struct {
|
|
Hash uint32
|
|
CompactRevision int64
|
|
Revision int64
|
|
}
|
|
|
|
type HashStorage interface {
|
|
// Hash computes the hash of the KV's backend.
|
|
Hash() (hash uint32, revision int64, err error)
|
|
|
|
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
|
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
|
|
|
|
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
|
|
Store(valueHash KeyValueHash)
|
|
|
|
// Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes.
|
|
Hashes() []KeyValueHash
|
|
}
|
|
|
|
type hashStorage struct {
|
|
store *store
|
|
hashMu sync.RWMutex
|
|
hashes []KeyValueHash
|
|
lg *zap.Logger
|
|
}
|
|
|
|
func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
|
|
return &hashStorage{
|
|
store: s,
|
|
lg: lg,
|
|
}
|
|
}
|
|
|
|
func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
|
|
return s.store.hash()
|
|
}
|
|
|
|
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
|
|
s.hashMu.RLock()
|
|
for _, h := range s.hashes {
|
|
if rev == h.Revision {
|
|
s.hashMu.RUnlock()
|
|
|
|
s.store.revMu.RLock()
|
|
currentRev := s.store.currentRev
|
|
s.store.revMu.RUnlock()
|
|
return h, currentRev, nil
|
|
}
|
|
}
|
|
s.hashMu.RUnlock()
|
|
|
|
return s.store.hashByRev(rev)
|
|
}
|
|
|
|
func (s *hashStorage) Store(hash KeyValueHash) {
|
|
s.lg.Info("storing new hash",
|
|
zap.Uint32("hash", hash.Hash),
|
|
zap.Int64("revision", hash.Revision),
|
|
zap.Int64("compact-revision", hash.CompactRevision),
|
|
)
|
|
s.hashMu.Lock()
|
|
defer s.hashMu.Unlock()
|
|
s.hashes = append(s.hashes, hash)
|
|
sort.Slice(s.hashes, func(i, j int) bool {
|
|
return s.hashes[i].Revision < s.hashes[j].Revision
|
|
})
|
|
if len(s.hashes) > hashStorageMaxSize {
|
|
s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
|
|
}
|
|
}
|
|
|
|
func (s *hashStorage) Hashes() []KeyValueHash {
|
|
s.hashMu.RLock()
|
|
// Copy out hashes under lock just to be safe
|
|
hashes := make([]KeyValueHash, 0, len(s.hashes))
|
|
for _, hash := range s.hashes {
|
|
hashes = append(hashes, hash)
|
|
}
|
|
s.hashMu.RUnlock()
|
|
return hashes
|
|
}
|