Merge pull request #13829 from qsyqian/feature/skip_compact_when_rev_not_change

skip compact when rev not change at period compact mode
This commit is contained in:
Sahdev Zala 2022-06-08 22:54:39 -04:00 committed by GitHub
commit d9e5460ca8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 3 deletions

View File

@ -102,6 +102,7 @@ func (pc *Periodic) Run() {
retentions := pc.getRetentions()
go func() {
lastRevision := int64(0)
lastSuccess := pc.clock.Now()
baseInterval := pc.period
for {
@ -121,8 +122,8 @@ func (pc *Periodic) Run() {
continue
}
}
if pc.clock.Now().Sub(lastSuccess) < baseInterval {
rev := pc.revs[0]
if pc.clock.Now().Sub(lastSuccess) < baseInterval || rev == lastRevision {
continue
}
@ -130,7 +131,6 @@ func (pc *Periodic) Run() {
if baseInterval == pc.period {
baseInterval = compactInterval
}
rev := pc.revs[0]
pc.lg.Info(
"starting auto periodic compaction",
@ -146,6 +146,7 @@ func (pc *Periodic) Run() {
zap.Duration("compact-period", pc.period),
zap.Duration("took", pc.clock.Now().Sub(startTime)),
)
lastRevision = rev
lastSuccess = pc.clock.Now()
} else {
pc.lg.Warn(

View File

@ -15,6 +15,7 @@
package v3compactor
import (
"errors"
"reflect"
"testing"
"time"
@ -172,3 +173,68 @@ func TestPeriodicPause(t *testing.T) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
}
func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
tb.Run()
defer tb.Stop()
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
// first compaction happens til 5 minutes elapsed
for i := 0; i < initialIntervals; i++ {
// every time set the same revision with 100
rg.SetRev(int64(100))
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// first compaction the compact revision will be 100+1
expectedRevision := int64(100 + 1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
// compaction doesn't happens at every interval since revision not change
for i := 0; i < 5; i++ {
for j := 0; j < intervalsPerPeriod; j++ {
rg.SetRev(int64(100))
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
_, err := compactable.Wait(1)
if err == nil {
t.Fatal(errors.New("should not compact since the revision not change"))
}
}
// when revision changed, compaction is normally
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
a, err = compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision = int64(100 + 2)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}