From f269c42aad6bd4b98b6f612a7113a6f1a17feed0 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 26 Apr 2018 14:21:15 -0700 Subject: [PATCH] compactor: support structured logger Signed-off-by: Gyuho Lee --- compactor/compactor.go | 15 ++++- compactor/periodic.go | 124 ++++++++++++++++++++++--------------- compactor/periodic_test.go | 7 ++- compactor/revision.go | 90 +++++++++++++++++---------- compactor/revision_test.go | 5 +- 5 files changed, 153 insertions(+), 88 deletions(-) diff --git a/compactor/compactor.go b/compactor/compactor.go index 8100b6938..8faf8d300 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -22,6 +22,8 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/pkg/capnslog" + "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) var ( @@ -54,12 +56,19 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { +// New returns a new Compactor based on given "mode". +func New( + lg *zap.Logger, + mode string, + retention time.Duration, + rg RevGetter, + c Compactable, +) (Compactor, error) { switch mode { case ModePeriodic: - return NewPeriodic(retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil case ModeRevision: - return NewRevision(int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/compactor/periodic.go b/compactor/periodic.go index 9d9164e9c..466cda4bd 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -23,11 +23,13 @@ import ( "github.com/coreos/etcd/mvcc" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) // Periodic compacts the log by purging revisions older than // the configured retention time. type Periodic struct { + lg *zap.Logger clock clockwork.Clock period time.Duration @@ -43,22 +45,19 @@ type Periodic struct { paused bool } -// NewPeriodic creates a new instance of Periodic compactor that purges +// newPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. -func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { - return newPeriodic(clockwork.NewRealClock(), h, rg, c) -} - -func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { - t := &Periodic{ +func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { + pc := &Periodic{ + lg: lg, clock: clock, period: h, rg: rg, c: c, revs: make([]int64, 0), } - t.ctx, t.cancel = context.WithCancel(context.Background()) - return t + pc.ctx, pc.cancel = context.WithCancel(context.Background()) + return pc } /* @@ -96,50 +95,77 @@ Compaction period 5-sec: */ // Run runs periodic compactor. -func (t *Periodic) Run() { - compactInterval := t.getCompactInterval() - retryInterval := t.getRetryInterval() - retentions := t.getRetentions() +func (pc *Periodic) Run() { + compactInterval := pc.getCompactInterval() + retryInterval := pc.getRetryInterval() + retentions := pc.getRetentions() go func() { - lastSuccess := t.clock.Now() - baseInterval := t.period + lastSuccess := pc.clock.Now() + baseInterval := pc.period for { - t.revs = append(t.revs, t.rg.Rev()) - if len(t.revs) > retentions { - t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago + pc.revs = append(pc.revs, pc.rg.Rev()) + if len(pc.revs) > retentions { + pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago } select { - case <-t.ctx.Done(): + case <-pc.ctx.Done(): return - case <-t.clock.After(retryInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() + case <-pc.clock.After(retryInterval): + pc.mu.Lock() + p := pc.paused + pc.mu.Unlock() if p { continue } } - if t.clock.Now().Sub(lastSuccess) < baseInterval { + if pc.clock.Now().Sub(lastSuccess) < baseInterval { continue } // wait up to initial given period - if baseInterval == t.period { + if baseInterval == pc.period { baseInterval = compactInterval } - rev := t.revs[0] + rev := pc.revs[0] - plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil || err == mvcc.ErrCompacted { - lastSuccess = t.clock.Now() - plog.Noticef("Finished auto-compaction at revision %d", rev) + if pc.lg != nil { + pc.lg.Info( + "starting auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + ) } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", retryInterval) + plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period) + } + _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + if pc.lg != nil { + pc.lg.Info( + "completed auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + zap.Duration("took", time.Since(lastSuccess)), + ) + } else { + plog.Noticef("Finished auto-compaction at revision %d", rev) + } + lastSuccess = pc.clock.Now() + } else { + if pc.lg != nil { + pc.lg.Warn( + "failed auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + zap.Duration("retry-interval", retryInterval), + zap.Error(err), + ) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", retryInterval) + } } } }() @@ -149,22 +175,22 @@ func (t *Periodic) Run() { // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) -func (t *Periodic) getCompactInterval() time.Duration { - itv := t.period +func (pc *Periodic) getCompactInterval() time.Duration { + itv := pc.period if itv > time.Hour { itv = time.Hour } return itv } -func (t *Periodic) getRetentions() int { - return int(t.period/t.getRetryInterval()) + 1 +func (pc *Periodic) getRetentions() int { + return int(pc.period/pc.getRetryInterval()) + 1 } const retryDivisor = 10 -func (t *Periodic) getRetryInterval() time.Duration { - itv := t.period +func (pc *Periodic) getRetryInterval() time.Duration { + itv := pc.period if itv > time.Hour { itv = time.Hour } @@ -172,20 +198,20 @@ func (t *Periodic) getRetryInterval() time.Duration { } // Stop stops periodic compactor. -func (t *Periodic) Stop() { - t.cancel() +func (pc *Periodic) Stop() { + pc.cancel() } // Pause pauses periodic compactor. -func (t *Periodic) Pause() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = true +func (pc *Periodic) Pause() { + pc.mu.Lock() + pc.paused = true + pc.mu.Unlock() } // Resume resumes periodic compactor. -func (t *Periodic) Resume() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = false +func (pc *Periodic) Resume() { + pc.mu.Lock() + pc.paused = false + pc.mu.Unlock() } diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index 21e539e76..fdf90e5df 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/pkg/testutil" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) func TestPeriodicHourly(t *testing.T) { @@ -32,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -83,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -131,7 +132,7 @@ func TestPeriodicPause(t *testing.T) { retentionDuration := time.Hour rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() tb.Pause() diff --git a/compactor/revision.go b/compactor/revision.go index 927e41c97..ace79c387 100644 --- a/compactor/revision.go +++ b/compactor/revision.go @@ -23,11 +23,14 @@ import ( "github.com/coreos/etcd/mvcc" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) // Revision compacts the log by purging revisions older than // the configured reivison number. Compaction happens every 5 minutes. type Revision struct { + lg *zap.Logger + clock clockwork.Clock retention int64 @@ -41,75 +44,100 @@ type Revision struct { paused bool } -// NewRevision creates a new instance of Revisonal compactor that purges +// newRevision creates a new instance of Revisonal compactor that purges // the log older than retention revisions from the current revision. -func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision { - return newRevision(clockwork.NewRealClock(), retention, rg, c) -} - -func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { - t := &Revision{ +func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { + rc := &Revision{ + lg: lg, clock: clock, retention: retention, rg: rg, c: c, } - t.ctx, t.cancel = context.WithCancel(context.Background()) - return t + rc.ctx, rc.cancel = context.WithCancel(context.Background()) + return rc } const revInterval = 5 * time.Minute // Run runs revision-based compactor. -func (t *Revision) Run() { +func (rc *Revision) Run() { prev := int64(0) go func() { for { select { - case <-t.ctx.Done(): + case <-rc.ctx.Done(): return - case <-t.clock.After(revInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() + case <-rc.clock.After(revInterval): + rc.mu.Lock() + p := rc.paused + rc.mu.Unlock() if p { continue } } - rev := t.rg.Rev() - t.retention + rev := rc.rg.Rev() - rc.retention if rev <= 0 || rev == prev { continue } - plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + now := time.Now() + if rc.lg != nil { + rc.lg.Info( + "starting auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + ) + } else { + plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention) + } + _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { prev = rev - plog.Noticef("Finished auto-compaction at revision %d", rev) + if rc.lg != nil { + rc.lg.Info( + "completed auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + zap.Duration("took", time.Since(now)), + ) + } else { + plog.Noticef("Finished auto-compaction at revision %d", rev) + } } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", revInterval) + if rc.lg != nil { + rc.lg.Warn( + "failed auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + zap.Duration("retry-interval", revInterval), + zap.Error(err), + ) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", revInterval) + } } } }() } // Stop stops revision-based compactor. -func (t *Revision) Stop() { - t.cancel() +func (rc *Revision) Stop() { + rc.cancel() } // Pause pauses revision-based compactor. -func (t *Revision) Pause() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = true +func (rc *Revision) Pause() { + rc.mu.Lock() + rc.paused = true + rc.mu.Unlock() } // Resume resumes revision-based compactor. -func (t *Revision) Resume() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = false +func (rc *Revision) Resume() { + rc.mu.Lock() + rc.paused = false + rc.mu.Unlock() } diff --git a/compactor/revision_test.go b/compactor/revision_test.go index 905683c36..e6bebcd49 100644 --- a/compactor/revision_test.go +++ b/compactor/revision_test.go @@ -23,13 +23,14 @@ import ( "github.com/coreos/etcd/pkg/testutil" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) func TestRevision(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(fc, 10, rg, compactable) + tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) tb.Run() defer tb.Stop() @@ -72,7 +73,7 @@ func TestRevisionPause(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(fc, 10, rg, compactable) + tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) tb.Run() tb.Pause()