From 8cffdbafbad56f26782fe7596c29d1d8d4f04ee7 Mon Sep 17 00:00:00 2001 From: kkkkun Date: Sun, 11 Jun 2023 22:26:55 +0800 Subject: [PATCH] etcdserver: fix corruption check when server has just been compacted Signed-off-by: kkkkun --- server/mvcc/kvstore.go | 2 +- server/mvcc/kvstore_test.go | 64 +++++++++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index c952d3379..fc48380b5 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -175,7 +175,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e compactRev, currentRev = s.compactMainRev, s.currentRev s.revMu.RUnlock() - if rev > 0 && rev <= compactRev { + if rev > 0 && rev < compactRev { s.mu.RUnlock() return KeyValueHash{}, 0, ErrCompacted } else if rev > 0 && rev > currentRev { diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 81dc27b60..548e7ac2b 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestStoreRev(t *testing.T) { @@ -536,7 +537,7 @@ type hashKVResult struct { func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) rev := 10000 for i := 2; i <= rev; i++ { @@ -544,9 +545,10 @@ func TestHashKVWhenCompacting(t *testing.T) { } hashCompactc := make(chan hashKVResult, 1) - - donec := make(chan struct{}) var wg sync.WaitGroup + donec := make(chan struct{}) + + // Call HashByRev(10000) in multiple goroutines until donec is closed for i := 0; i < 10; i++ { wg.Add(1) go func() { @@ -565,10 +567,12 @@ func TestHashKVWhenCompacting(t *testing.T) { }() } + // Check computed hashes by HashByRev are correct in a goroutine, until donec is closed + wg.Add(1) go func() { - defer close(donec) + defer wg.Done() revHash := make(map[int64]uint32) - for round := 0; round < 1000; round++ { + for { r := <-hashCompactc if revHash[r.compactRev] == 0 { revHash[r.compactRev] = r.hash @@ -576,17 +580,26 @@ func TestHashKVWhenCompacting(t *testing.T) { if r.hash != revHash[r.compactRev] { t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev]) } + + select { + case <-donec: + return + default: + } } }() - wg.Add(1) + // Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished go func() { - defer wg.Done() + defer close(donec) for i := 100; i >= 0; i-- { - _, err := s.Compact(traceutil.TODO(), int64(rev-1-i)) + _, err := s.Compact(traceutil.TODO(), int64(rev-i)) if err != nil { t.Error(err) } + // Wait for the compaction job to finish + s.fifoSched.WaitFinish(1) + // Leave time for calls to HashByRev to take place after each compaction time.Sleep(10 * time.Millisecond) } }() @@ -599,12 +612,45 @@ func TestHashKVWhenCompacting(t *testing.T) { } } +// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called +// with a past revision (lower than compacted), a future revision, and the exact compacted revision +func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { + b, tmpPath := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) + + rev := 10000 + compactRev := rev / 2 + + for i := 2; i <= rev; i++ { + s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) + } + if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil { + t.Fatal(err) + } + + _, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1)) + if errFutureRev != ErrFutureRev { + t.Error(errFutureRev) + } + + _, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1)) + if errPastRev != ErrCompacted { + t.Error(errPastRev) + } + + _, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev)) + if errCompactRev != nil { + t.Error(errCompactRev) + } +} + // TestHashKVZeroRevision ensures that "HashByRev(0)" computes // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) rev := 10000 for i := 2; i <= rev; i++ {