Fix storing compaction hash with multiple ongoing compaction requests

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-05-30 20:56:44 +02:00
parent 2f216dcc94
commit c87d197d49

View File

@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool {
defer tx.RUnlock() defer tx.RUnlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact))
return completed
} }
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} { func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} {
ch := make(chan struct{}) ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil { if ctx.Err() != nil {
s.compactBarrier(ctx, ch) s.compactBarrier(ctx, ch)
return return
} }
tx := s.b.ReadTx()
tx.RLock()
finishedCompact, _ := UnsafeReadFinishedCompact(tx)
tx.RUnlock()
hash, err := s.scheduleCompaction(rev, prevCompactRev) hash, err := s.scheduleCompaction(rev, prevCompactRev)
if err != nil { if err != nil {
s.lg.Warn("Failed compaction", zap.Error(err)) s.lg.Warn("Failed compaction", zap.Error(err))
@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
} }
// Only store the hash value if the previous hash is completed, i.e. this compaction // Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919. // hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted { if finishedCompact == prevCompactRev {
s.hashes.Store(hash) s.hashes.Store(hash)
} else { } else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
} }
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev) ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil { if err != nil {
return ch, err return ch, err
} }
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil return s.compact(traceutil.TODO(), rev, prevCompactRev), nil
} }
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock() s.mu.Lock()
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev) ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision") trace.Step("check and update compact revision")
if err != nil { if err != nil {
@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
} }
s.mu.Unlock() s.mu.Unlock()
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil return s.compact(trace, rev, prevCompactRev), nil
} }
func (s *store) Commit() { func (s *store) Commit() {