mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: fix corruption check when server has just been compacted
When a key-value store corruption check happens immediately after a compaction, the revision at which the key-value store hash is computed, is the compacted revision itself. In that case, the hash computation logic was incorrect because it returned an ErrCompacted error; this error should instead be returned when the revision at which the key-value store is hashed, is strictly lower than the compacted revision. Fixes #14325 Signed-off-by: Jeremy Leach <44558776+jbml@users.noreply.github.com>
This commit is contained in:
parent
b886bbc89f
commit
cc1e245368
@ -177,7 +177,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
|
|||||||
compactRev, currentRev = s.compactMainRev, s.currentRev
|
compactRev, currentRev = s.compactMainRev, s.currentRev
|
||||||
s.revMu.RUnlock()
|
s.revMu.RUnlock()
|
||||||
|
|
||||||
if rev > 0 && rev <= compactRev {
|
if rev > 0 && rev < compactRev {
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
return KeyValueHash{}, 0, ErrCompacted
|
return KeyValueHash{}, 0, ErrCompacted
|
||||||
} else if rev > 0 && rev > currentRev {
|
} else if rev > 0 && rev > currentRev {
|
||||||
|
@ -542,9 +542,7 @@ type hashKVResult struct {
|
|||||||
func TestHashKVWhenCompacting(t *testing.T) {
|
func TestHashKVWhenCompacting(t *testing.T) {
|
||||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
defer s.Close()
|
defer cleanup(s, b, tmpPath)
|
||||||
defer b.Close()
|
|
||||||
defer os.Remove(tmpPath)
|
|
||||||
|
|
||||||
rev := 10000
|
rev := 10000
|
||||||
for i := 2; i <= rev; i++ {
|
for i := 2; i <= rev; i++ {
|
||||||
@ -552,9 +550,10 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
hashCompactc := make(chan hashKVResult, 1)
|
hashCompactc := make(chan hashKVResult, 1)
|
||||||
|
|
||||||
donec := make(chan struct{})
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
donec := make(chan struct{})
|
||||||
|
|
||||||
|
// Call HashByRev(10000) in multiple goroutines until donec is closed
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -573,10 +572,12 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(donec)
|
defer wg.Done()
|
||||||
revHash := make(map[int64]uint32)
|
revHash := make(map[int64]uint32)
|
||||||
for round := 0; round < 1000; round++ {
|
for {
|
||||||
r := <-hashCompactc
|
r := <-hashCompactc
|
||||||
if revHash[r.compactRev] == 0 {
|
if revHash[r.compactRev] == 0 {
|
||||||
revHash[r.compactRev] = r.hash
|
revHash[r.compactRev] = r.hash
|
||||||
@ -584,17 +585,26 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
if r.hash != revHash[r.compactRev] {
|
if r.hash != revHash[r.compactRev] {
|
||||||
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
|
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-donec:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Add(1)
|
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer close(donec)
|
||||||
for i := 100; i >= 0; i-- {
|
for i := 100; i >= 0; i-- {
|
||||||
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
|
_, err := s.Compact(traceutil.TODO(), int64(rev-i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
// Wait for the compaction job to finish
|
||||||
|
s.fifoSched.WaitFinish(1)
|
||||||
|
// Leave time for calls to HashByRev to take place after each compaction
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -607,12 +617,45 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
|
||||||
|
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
|
||||||
|
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
|
||||||
|
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||||
|
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
|
rev := 10000
|
||||||
|
compactRev := rev / 2
|
||||||
|
|
||||||
|
for i := 2; i <= rev; i++ {
|
||||||
|
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
||||||
|
}
|
||||||
|
if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
|
||||||
|
if errFutureRev != ErrFutureRev {
|
||||||
|
t.Error(errFutureRev)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
|
||||||
|
if errPastRev != ErrCompacted {
|
||||||
|
t.Error(errPastRev)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
|
||||||
|
if errCompactRev != nil {
|
||||||
|
t.Error(errCompactRev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
|
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
|
||||||
// correct hash value with latest revision.
|
// correct hash value with latest revision.
|
||||||
func TestHashKVZeroRevision(t *testing.T) {
|
func TestHashKVZeroRevision(t *testing.T) {
|
||||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
rev := 10000
|
rev := 10000
|
||||||
for i := 2; i <= rev; i++ {
|
for i := 2; i <= rev; i++ {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user