diff --git a/server/etcdserver/api/v3compactor/periodic.go b/server/etcdserver/api/v3compactor/periodic.go index 380fdfe2a..853c1a9e7 100644 --- a/server/etcdserver/api/v3compactor/periodic.go +++ b/server/etcdserver/api/v3compactor/periodic.go @@ -102,6 +102,7 @@ func (pc *Periodic) Run() { retentions := pc.getRetentions() go func() { + lastRevision := int64(0) lastSuccess := pc.clock.Now() baseInterval := pc.period for { @@ -121,8 +122,8 @@ func (pc *Periodic) Run() { continue } } - - if pc.clock.Now().Sub(lastSuccess) < baseInterval { + rev := pc.revs[0] + if pc.clock.Now().Sub(lastSuccess) < baseInterval || rev == lastRevision { continue } @@ -130,7 +131,6 @@ func (pc *Periodic) Run() { if baseInterval == pc.period { baseInterval = compactInterval } - rev := pc.revs[0] pc.lg.Info( "starting auto periodic compaction", @@ -146,6 +146,7 @@ func (pc *Periodic) Run() { zap.Duration("compact-period", pc.period), zap.Duration("took", pc.clock.Now().Sub(startTime)), ) + lastRevision = rev lastSuccess = pc.clock.Now() } else { pc.lg.Warn( diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go index 07313cc68..b552af61b 100644 --- a/server/etcdserver/api/v3compactor/periodic_test.go +++ b/server/etcdserver/api/v3compactor/periodic_test.go @@ -15,6 +15,7 @@ package v3compactor import ( + "errors" "reflect" "testing" "time" @@ -172,3 +173,68 @@ func TestPeriodicPause(t *testing.T) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) } } + +func TestPeriodicSkipRevNotChange(t *testing.T) { + retentionMinutes := 5 + retentionDuration := time.Duration(retentionMinutes) * time.Minute + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) + + tb.Run() + defer tb.Stop() + + initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + + // first compaction happens til 5 minutes elapsed + for i := 0; i < initialIntervals; i++ { + // every time set the same revision with 100 + rg.SetRev(int64(100)) + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + + // first compaction the compact revision will be 100+1 + expectedRevision := int64(100 + 1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // compaction doesn't happens at every interval since revision not change + for i := 0; i < 5; i++ { + for j := 0; j < intervalsPerPeriod; j++ { + rg.SetRev(int64(100)) + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + _, err := compactable.Wait(1) + if err == nil { + t.Fatal(errors.New("should not compact since the revision not change")) + } + } + + // when revision changed, compaction is normally + for i := 0; i < initialIntervals; i++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + a, err = compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + + expectedRevision = int64(100 + 2) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } +}