From e029de320a87630d5855f727aab2d016414886f5 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 21 Mar 2018 17:04:48 -0700 Subject: [PATCH] compactor: clean up Signed-off-by: Gyuho Lee --- compactor/compactor.go | 2 -- compactor/periodic.go | 33 +++++++++++++++++++++++---------- compactor/periodic_test.go | 19 +++++-------------- compactor/revision.go | 31 ++++++++++++++++++++----------- compactor/revision_test.go | 29 ++++++++++------------------- 5 files changed, 58 insertions(+), 56 deletions(-) diff --git a/compactor/compactor.go b/compactor/compactor.go index c05722517..8100b6938 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -29,8 +29,6 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute - ModePeriodic = "periodic" ModeRevision = "revision" ) diff --git a/compactor/periodic.go b/compactor/periodic.go index 447352ec3..9e83191fa 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -46,30 +46,35 @@ type Periodic struct { // NewPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { - return &Periodic{ - clock: clockwork.NewRealClock(), + return newPeriodic(clockwork.NewRealClock(), h, rg, c) +} + +func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { + t := &Periodic{ + clock: clock, period: h, rg: rg, c: c, + revs: make([]int64, 0), } + t.ctx, t.cancel = context.WithCancel(context.Background()) + return t } // periodDivisor divides Periodic.period in into checkCompactInterval duration const periodDivisor = 10 +// Run runs periodic compactor. func (t *Periodic) Run() { - t.ctx, t.cancel = context.WithCancel(context.Background()) - t.revs = make([]int64, 0) - clock := t.clock - checkCompactInterval := t.period / time.Duration(periodDivisor) + interval := t.period / time.Duration(periodDivisor) go func() { - last := clock.Now() + initialWait := t.clock.Now() for { t.revs = append(t.revs, t.rg.Rev()) select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactInterval): + case <-t.clock.After(interval): t.mu.Lock() p := t.paused t.mu.Unlock() @@ -77,36 +82,44 @@ func (t *Periodic) Run() { continue } } - if clock.Now().Sub(last) < t.period { + + // wait up to initial given period + if t.clock.Now().Sub(initialWait) < t.period { continue } + rev, remaining := t.getRev() if rev < 0 { continue } + plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { + // move to next sliding window t.revs = remaining plog.Noticef("Finished auto-compaction at revision %d", rev) } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", checkCompactInterval) + plog.Noticef("Retry after %v", interval) } } }() } +// Stop stops periodic compactor. func (t *Periodic) Stop() { t.cancel() } +// Pause pauses periodic compactor. func (t *Periodic) Pause() { t.mu.Lock() defer t.mu.Unlock() t.paused = true } +// Resume resumes periodic compactor. func (t *Periodic) Resume() { t.mu.Lock() defer t.mu.Unlock() diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index 19abd4fdb..f039a8a76 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -21,6 +21,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "github.com/jonboulle/clockwork" ) @@ -31,12 +32,7 @@ func TestPeriodic(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := &Periodic{ - clock: fc, - period: retentionDuration, - rg: rg, - c: compactable, - } + tb := newPeriodic(fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -70,15 +66,10 @@ func TestPeriodic(t *testing.T) { func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() - compactable := &fakeCompactable{testutil.NewRecorderStream()} - rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} retentionDuration := time.Hour - tb := &Periodic{ - clock: fc, - period: retentionDuration, - rg: rg, - c: compactable, - } + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := newPeriodic(fc, retentionDuration, rg, compactable) tb.Run() tb.Pause() diff --git a/compactor/revision.go b/compactor/revision.go index 4a8761412..927e41c97 100644 --- a/compactor/revision.go +++ b/compactor/revision.go @@ -17,6 +17,7 @@ package compactor import ( "context" "sync" + "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" @@ -43,25 +44,31 @@ type Revision struct { // NewRevision creates a new instance of Revisonal compactor that purges // the log older than retention revisions from the current revision. func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision { - return &Revision{ - clock: clockwork.NewRealClock(), + return newRevision(clockwork.NewRealClock(), retention, rg, c) +} + +func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { + t := &Revision{ + clock: clock, retention: retention, rg: rg, c: c, } + t.ctx, t.cancel = context.WithCancel(context.Background()) + return t } -func (t *Revision) Run() { - t.ctx, t.cancel = context.WithCancel(context.Background()) - clock := t.clock - previous := int64(0) +const revInterval = 5 * time.Minute +// Run runs revision-based compactor. +func (t *Revision) Run() { + prev := int64(0) go func() { for { select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactionInterval): + case <-t.clock.After(revInterval): t.mu.Lock() p := t.paused t.mu.Unlock() @@ -71,34 +78,36 @@ func (t *Revision) Run() { } rev := t.rg.Rev() - t.retention - - if rev <= 0 || rev == previous { + if rev <= 0 || rev == prev { continue } plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { - previous = rev + prev = rev plog.Noticef("Finished auto-compaction at revision %d", rev) } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", checkCompactionInterval) + plog.Noticef("Retry after %v", revInterval) } } }() } +// Stop stops revision-based compactor. func (t *Revision) Stop() { t.cancel() } +// Pause pauses revision-based compactor. func (t *Revision) Pause() { t.mu.Lock() defer t.mu.Unlock() t.paused = true } +// Resume resumes revision-based compactor. func (t *Revision) Resume() { t.mu.Lock() defer t.mu.Unlock() diff --git a/compactor/revision_test.go b/compactor/revision_test.go index 3c52f94c9..905683c36 100644 --- a/compactor/revision_test.go +++ b/compactor/revision_test.go @@ -21,6 +21,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "github.com/jonboulle/clockwork" ) @@ -28,23 +29,18 @@ func TestRevision(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := &Revision{ - clock: fc, - retention: 10, - rg: rg, - c: compactable, - } + tb := newRevision(fc, 10, rg, compactable) tb.Run() defer tb.Stop() - fc.Advance(checkCompactionInterval) + fc.Advance(revInterval) rg.Wait(1) // nothing happens rg.SetRev(99) // will be 100 expectedRevision := int64(90) - fc.Advance(checkCompactionInterval) + fc.Advance(revInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil { @@ -61,7 +57,7 @@ func TestRevision(t *testing.T) { rg.SetRev(199) // will be 200 expectedRevision = int64(190) - fc.Advance(checkCompactionInterval) + fc.Advance(revInterval) rg.Wait(1) a, err = compactable.Wait(1) if err != nil { @@ -74,22 +70,17 @@ func TestRevision(t *testing.T) { func TestRevisionPause(t *testing.T) { fc := clockwork.NewFakeClock() - compactable := &fakeCompactable{testutil.NewRecorderStream()} rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 - tb := &Revision{ - clock: fc, - retention: 10, - rg: rg, - c: compactable, - } + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := newRevision(fc, 10, rg, compactable) tb.Run() tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / checkCompactionInterval) + n := int(time.Hour / revInterval) for i := 0; i < 3*n; i++ { - fc.Advance(checkCompactionInterval) + fc.Advance(revInterval) } // tb ends up waiting for the clock @@ -103,7 +94,7 @@ func TestRevisionPause(t *testing.T) { tb.Resume() // unblock clock, will kick off a compaction at hour 3:05 - fc.Advance(checkCompactionInterval) + fc.Advance(revInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil {