From 501c80fbbc6b1a385d611a3f02eb150d60730c76 Mon Sep 17 00:00:00 2001 From: Iwasaki Yudai Date: Thu, 22 Mar 2018 16:34:59 -0700 Subject: [PATCH] compactor: adjust interval for period <1-hour --- compactor/periodic.go | 118 +++++++++++++++++++++++++++---------- compactor/periodic_test.go | 82 ++++++++++++++++++-------- 2 files changed, 143 insertions(+), 57 deletions(-) diff --git a/compactor/periodic.go b/compactor/periodic.go index 9e83191fa..09953528e 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -61,75 +61,129 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact return t } -// periodDivisor divides Periodic.period in into checkCompactInterval duration -const periodDivisor = 10 - -// Run runs periodic compactor. func (t *Periodic) Run() { - interval := t.period / time.Duration(periodDivisor) + fetchInterval := t.getFetchInterval() + retryInterval := t.getRetryInterval() + retentions := int(t.period/fetchInterval) + 1 // number of revs to keep for t.period + notify := make(chan struct{}, 1) + + // periodically updates t.revs and notify to the other goroutine go func() { - initialWait := t.clock.Now() for { - t.revs = append(t.revs, t.rg.Rev()) + rev := t.rg.Rev() + t.mu.Lock() + t.revs = append(t.revs, rev) + if len(t.revs) > retentions { + t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago + } + t.mu.Unlock() + + select { + case notify <- struct{}{}: + default: + // compaction can take time more than interval + } + select { case <-t.ctx.Done(): return - case <-t.clock.After(interval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() - if p { - continue - } + case <-t.clock.After(fetchInterval): + } + } + }() + + // run compaction triggered by the other goroutine thorough the notify channel + // or internal periodic retry + go func() { + var lastCompactedRev int64 + for { + select { + case <-t.ctx.Done(): + return + case <-notify: + // from the other goroutine + case <-t.clock.After(retryInterval): + // for retry + // when t.rev is not updated, this event will be ignored later, + // so we don't need to think about race with <-notify. } - // wait up to initial given period - if t.clock.Now().Sub(initialWait) < t.period { + t.mu.Lock() + p := t.paused + rev := t.revs[0] + len := len(t.revs) + t.mu.Unlock() + if p { continue } - rev, remaining := t.getRev() - if rev < 0 { + // it's too early to start working + if len != retentions { + continue + } + + // if t.revs is not updated, we can ignore the event. + // it's not the first time to try comapction in this interval. + if rev == lastCompactedRev { continue } plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { - // move to next sliding window - t.revs = remaining plog.Noticef("Finished auto-compaction at revision %d", rev) + lastCompactedRev = rev } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", interval) + plog.Noticef("Retry after %s", retryInterval) } } }() } -// Stop stops periodic compactor. +// if given compaction period x is <1-hour, compact every x duration. +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) +// if given compaction period x is >1-hour, compact every hour. +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) +func (t *Periodic) getFetchInterval() time.Duration { + itv := t.period + if itv > time.Hour { + itv = time.Hour + } + return itv +} + +const retryDivisor = 10 + +func (t *Periodic) getRetryInterval() time.Duration { + itv := t.period / retryDivisor + // we don't want to too aggressive retries + // and also jump between 6-minute through 60-minute + if itv < (6 * time.Minute) { // t.period is less than hour + // if t.period is less than 6-minute, + // retry interval is t.period. + // if we divide byretryDivisor, it's too aggressive + if t.period < 6*time.Minute { + itv = t.period + } else { + itv = 6 * time.Minute + } + } + return itv +} + func (t *Periodic) Stop() { t.cancel() } -// Pause pauses periodic compactor. func (t *Periodic) Pause() { t.mu.Lock() defer t.mu.Unlock() t.paused = true } -// Resume resumes periodic compactor. func (t *Periodic) Resume() { t.mu.Lock() defer t.mu.Unlock() t.paused = false } - -func (t *Periodic) getRev() (int64, []int64) { - i := len(t.revs) - periodDivisor - if i < 0 { - return -1, t.revs - } - return t.revs[i], t.revs[i+1:] -} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index f039a8a76..d4ad5cd03 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -25,7 +25,7 @@ import ( "github.com/jonboulle/clockwork" ) -func TestPeriodic(t *testing.T) { +func TestPeriodicHourly(t *testing.T) { retentionHours := 2 retentionDuration := time.Duration(retentionHours) * time.Hour @@ -36,31 +36,59 @@ func TestPeriodic(t *testing.T) { tb.Run() defer tb.Stop() - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - // simulate 5 hours worth of intervals. - for i := 0; i < n/retentionHours*5; i++ { + // simulate 5 hours + + for i := 0; i < 5; i++ { rg.Wait(1) - fc.Advance(checkCompactInterval) + fc.Advance(time.Hour) // compaction doesn't happen til 2 hours elapses. - if i < n { + if i < retentionHours { continue } - // after 2 hours, compaction happens at every checkCompactInterval. + // after 2 hours, compaction happens at every interval. + // at i = 3, t.revs = [1(2h-ago,T=0h), 2(1h-ago,T=1h), 3(now,T=2h)] (len=3) (rev starts from 1) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(i + 1 - n) + expectedRevision := int64(i - 1) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } } +} + +func TestPeriodicMinutes(t *testing.T) { + retentionMinutes := 23 + retentionDuration := time.Duration(retentionMinutes) * time.Minute + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := newPeriodic(fc, retentionDuration, rg, compactable) + + tb.Run() + defer tb.Stop() + + // simulate 115 (23 * 5) minutes + for i := 0; i < 5; i++ { + rg.Wait(1) + fc.Advance(retentionDuration) + + // notting happens at T=0 + if i == 0 { + continue + } + // from T=23m (i=1), compaction happens at every interval + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(i) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } - // unblock the rev getter, so we can stop the compactor routine. - _, err := rg.Wait(1) - if err != nil { - t.Fatal(err) } } @@ -75,13 +103,17 @@ func TestPeriodicPause(t *testing.T) { tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - for i := 0; i < 3*n; i++ { - rg.Wait(1) - fc.Advance(checkCompactInterval) - } - // tb ends up waiting for the clock + // T=0 + rg.Wait(1) // t.revs = [1] + fc.Advance(time.Hour) + // T=1h + rg.Wait(1) // t.revs = [1, 2] + fc.Advance(time.Hour) + // T=2h + rg.Wait(1) // t.revs = [2, 3] + fc.Advance(time.Hour) + // T=3h + rg.Wait(1) // t.revs = [3, 4] select { case a := <-compactable.Chan(): @@ -92,15 +124,15 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3:06 - rg.Wait(1) - fc.Advance(checkCompactInterval) + // unblock clock, will kick off a compaction at T=3h6m by retry + fc.Advance(time.Minute * 6) + // T=3h6m a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from hour 2:06 - wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} + // compact the revision from T=3h + wreq := &pb.CompactionRequest{Revision: int64(3)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) }