From bce0d0b799ea3a47d56f4666cad5176cc80d7d88 Mon Sep 17 00:00:00 2001 From: kkkkun Date: Sun, 11 Jun 2023 22:01:31 +0800 Subject: [PATCH] etcdserver: fix corruption check when server has just been compacted Signed-off-by: kkkkun --- mvcc/kvstore.go | 2 +- mvcc/kvstore_test.go | 64 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index d688dcd1c..6dc4f30bc 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -198,7 +198,7 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev compactRev, currentRev = s.compactMainRev, s.currentRev s.revMu.RUnlock() - if rev > 0 && rev <= compactRev { + if rev > 0 && rev < compactRev { s.mu.RUnlock() return 0, 0, compactRev, ErrCompacted } else if rev > 0 && rev > currentRev { diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 672878fa6..44baee21c 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -536,7 +536,7 @@ type hashKVResult struct { func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) rev := 10000 for i := 2; i <= rev; i++ { @@ -544,9 +544,10 @@ func TestHashKVWhenCompacting(t *testing.T) { } hashCompactc := make(chan hashKVResult, 1) - - donec := make(chan struct{}) var wg sync.WaitGroup + donec := make(chan struct{}) + + // Call HashByRev(10000) in multiple goroutines until donec is closed for i := 0; i < 10; i++ { wg.Add(1) go func() { @@ -565,10 +566,12 @@ func TestHashKVWhenCompacting(t *testing.T) { }() } + // Check computed hashes by HashByRev are correct in a goroutine, until donec is closed + wg.Add(1) go func() { - defer close(donec) + defer wg.Done() revHash := make(map[int64]uint32) - for round := 0; round < 1000; round++ { + for { r := <-hashCompactc if revHash[r.compactRev] == 0 { revHash[r.compactRev] = r.hash @@ -576,21 +579,29 @@ func TestHashKVWhenCompacting(t *testing.T) { if r.hash != revHash[r.compactRev] { t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev]) } + + select { + case <-donec: + return + default: + } } }() - wg.Add(1) + // Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished go func() { - defer wg.Done() + defer close(donec) for i := 100; i >= 0; i-- { - _, err := s.Compact(traceutil.TODO(), int64(rev-1-i)) + _, err := s.Compact(traceutil.TODO(), int64(rev-i)) if err != nil { t.Error(err) } + // Wait for the compaction job to finish + s.fifoSched.WaitFinish(1) + // Leave time for calls to HashByRev to take place after each compaction time.Sleep(10 * time.Millisecond) } }() - select { case <-donec: wg.Wait() @@ -599,6 +610,39 @@ func TestHashKVWhenCompacting(t *testing.T) { } } +// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called +// with a past revision (lower than compacted), a future revision, and the exact compacted revision +func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + defer cleanup(s, b, tmpPath) + + rev := 10000 + compactRev := rev / 2 + + for i := 2; i <= rev; i++ { + s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) + } + if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil { + t.Fatal(err) + } + + _, _, _, errFutureRev := s.HashByRev(int64(rev + 1)) + if errFutureRev != ErrFutureRev { + t.Error(errFutureRev) + } + + _, _, _, errPastRev := s.HashByRev(int64(compactRev - 1)) + if errPastRev != ErrCompacted { + t.Error(errPastRev) + } + + _, _, _, errCompactRev := s.HashByRev(int64(compactRev)) + if errCompactRev != nil { + t.Error(errCompactRev) + } +} + // TestHashKVZeroRevision ensures that "HashByRev(0)" computes // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { @@ -653,7 +697,7 @@ func TestTxnPut(t *testing.T) { func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) // write something to read later s.Put([]byte("foo"), []byte("bar"), lease.NoLease)