Merge c87d197d49a89f6bbdc4c4a23f318a9054145699 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Marek Siarkowicz 2024-09-26 22:00:06 +01:00 committed by GitHub
commit 9799eccd2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool {
defer tx.RUnlock() defer tx.RUnlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact))
return completed
} }
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} { func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} {
ch := make(chan struct{}) ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil { if ctx.Err() != nil {
s.compactBarrier(ctx, ch) s.compactBarrier(ctx, ch)
return return
} }
tx := s.b.ReadTx()
tx.RLock()
finishedCompact, _ := UnsafeReadFinishedCompact(tx)
tx.RUnlock()
hash, err := s.scheduleCompaction(rev, prevCompactRev) hash, err := s.scheduleCompaction(rev, prevCompactRev)
if err != nil { if err != nil {
s.lg.Warn("Failed compaction", zap.Error(err)) s.lg.Warn("Failed compaction", zap.Error(err))
@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
} }
// Only store the hash value if the previous hash is completed, i.e. this compaction // Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919. // hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted { if finishedCompact == prevCompactRev {
s.hashes.Store(hash) s.hashes.Store(hash)
} else { } else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
} }
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev) ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil { if err != nil {
return ch, err return ch, err
} }
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil return s.compact(traceutil.TODO(), rev, prevCompactRev), nil
} }
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock() s.mu.Lock()
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev) ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision") trace.Step("check and update compact revision")
if err != nil { if err != nil {
@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
} }
s.mu.Unlock() s.mu.Unlock()
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil return s.compact(trace, rev, prevCompactRev), nil
} }
func (s *store) Commit() { func (s *store) Commit() {