From cc1e245368bdeb2a8cf451246393e26367e285f5 Mon Sep 17 00:00:00 2001 From: Jeremy Leach <44558776+jbml@users.noreply.github.com> Date: Sat, 10 Sep 2022 23:31:47 +1000 Subject: [PATCH] etcdserver: fix corruption check when server has just been compacted When a key-value store corruption check happens immediately after a compaction, the revision at which the key-value store hash is computed, is the compacted revision itself. In that case, the hash computation logic was incorrect because it returned an ErrCompacted error; this error should instead be returned when the revision at which the key-value store is hashed, is strictly lower than the compacted revision. Fixes #14325 Signed-off-by: Jeremy Leach <44558776+jbml@users.noreply.github.com> --- server/storage/mvcc/kvstore.go | 2 +- server/storage/mvcc/kvstore_test.go | 65 ++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 442a28bae..4268841be 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -177,7 +177,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e compactRev, currentRev = s.compactMainRev, s.currentRev s.revMu.RUnlock() - if rev > 0 && rev <= compactRev { + if rev > 0 && rev < compactRev { s.mu.RUnlock() return KeyValueHash{}, 0, ErrCompacted } else if rev > 0 && rev > currentRev { diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 77b93d69a..b924609cf 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -542,9 +542,7 @@ type hashKVResult struct { func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer s.Close() - defer b.Close() - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) rev := 10000 for i := 2; i <= rev; i++ { @@ -552,9 +550,10 @@ func TestHashKVWhenCompacting(t *testing.T) { } hashCompactc := make(chan hashKVResult, 1) - - donec := make(chan struct{}) var wg sync.WaitGroup + donec := make(chan struct{}) + + // Call HashByRev(10000) in multiple goroutines until donec is closed for i := 0; i < 10; i++ { wg.Add(1) go func() { @@ -573,10 +572,12 @@ func TestHashKVWhenCompacting(t *testing.T) { }() } + // Check computed hashes by HashByRev are correct in a goroutine, until donec is closed + wg.Add(1) go func() { - defer close(donec) + defer wg.Done() revHash := make(map[int64]uint32) - for round := 0; round < 1000; round++ { + for { r := <-hashCompactc if revHash[r.compactRev] == 0 { revHash[r.compactRev] = r.hash @@ -584,17 +585,26 @@ func TestHashKVWhenCompacting(t *testing.T) { if r.hash != revHash[r.compactRev] { t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev]) } + + select { + case <-donec: + return + default: + } } }() - wg.Add(1) + // Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished go func() { - defer wg.Done() + defer close(donec) for i := 100; i >= 0; i-- { - _, err := s.Compact(traceutil.TODO(), int64(rev-1-i)) + _, err := s.Compact(traceutil.TODO(), int64(rev-i)) if err != nil { t.Error(err) } + // Wait for the compaction job to finish + s.fifoSched.WaitFinish(1) + // Leave time for calls to HashByRev to take place after each compaction time.Sleep(10 * time.Millisecond) } }() @@ -607,12 +617,45 @@ func TestHashKVWhenCompacting(t *testing.T) { } } +// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called +// with a past revision (lower than compacted), a future revision, and the exact compacted revision +func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { + b, tmpPath := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) + + rev := 10000 + compactRev := rev / 2 + + for i := 2; i <= rev; i++ { + s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) + } + if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil { + t.Fatal(err) + } + + _, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1)) + if errFutureRev != ErrFutureRev { + t.Error(errFutureRev) + } + + _, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1)) + if errPastRev != ErrCompacted { + t.Error(errPastRev) + } + + _, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev)) + if errCompactRev != nil { + t.Error(errCompactRev) + } +} + // TestHashKVZeroRevision ensures that "HashByRev(0)" computes // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) rev := 10000 for i := 2; i <= rev; i++ {