etcdserver: add auto compaction interval option

With enough key revisions per second, the constant 5 minute interval on
revision compaction isn't fast enough to keep up with growth. Make the
interval configurable, which will override the default behavior.

Also pass the configuration through to periodic compaction for finer
control of when compaction happens based on time.

Fixes:
https://github.com/etcd-io/etcd/issues/18471

Signed-off-by: Justin Cichra <jcichra@cloudflare.com>
This commit is contained in:
Justin Cichra 2024-08-21 15:00:56 -04:00
parent bd93a0060a
commit 98f118adeb
11 changed files with 146 additions and 29 deletions

View File

@ -117,6 +117,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
AutoCompactionInterval time.Duration
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64

View File

@ -272,6 +272,9 @@ type Config struct {
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
AutoCompactionRetention string `json:"auto-compaction-retention"`
// AutoCompactionInterval is the delay between compaction runs.
// If no interval is specified 'periodic' defaults to retention, revision defaults to 5 minutes
AutoCompactionInterval string `json:"auto-compaction-interval"`
// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server
@ -724,6 +727,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")
fs.StringVar(&cfg.AutoCompactionInterval, "auto-compaction-interval", "", "Auto compaction interval for mvcc key value store. Default is based on mode selected.")
// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")

View File

@ -533,6 +533,35 @@ func TestAutoCompactionModeParse(t *testing.T) {
}
}
func TestAutoCompactionIntervalParse(t *testing.T) {
tests := []struct {
interval string
werr bool
wdur time.Duration
}{
{"", false, 0},
{"1", true, 0},
{"1h", false, time.Hour},
{"1s", false, time.Second},
{"a", true, 0},
{"-1", true, 0},
}
hasErr := func(err error) bool {
return err != nil
}
for i, tt := range tests {
dur, err := parseCompactionInterval(tt.interval)
if hasErr(err) != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
if dur != tt.wdur {
t.Errorf("#%d: duration = %s, want %s", i, dur, tt.wdur)
}
}
}
func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
defer func() { getCluster = srv.GetCluster }()

View File

@ -165,6 +165,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return e, err
}
autoCompactionInterval, err := parseCompactionInterval(cfg.AutoCompactionInterval)
if err != nil {
return e, err
}
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := config.ServerConfig{
@ -189,6 +194,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
AutoCompactionInterval: autoCompactionInterval,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
@ -892,6 +898,13 @@ func (e *Etcd) GetLogger() *zap.Logger {
return l
}
func parseCompactionInterval(interval string) (ret time.Duration, err error) {
if interval == "" {
return ret, nil
}
return time.ParseDuration(interval)
}
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
h, err := strconv.Atoi(retention)
if err == nil && h >= 0 {

View File

@ -168,6 +168,8 @@ Clustering:
Auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--auto-compaction-interval ''
Auto compaction interval. Empty means use default based on mode selected.
--v2-deprecation '` + string(cconfig.V2DeprDefault) + `'
Phase of v2store deprecation. Allows to opt-in for higher compatibility mode.
Supported values:

View File

@ -56,6 +56,7 @@ func New(
lg *zap.Logger,
mode string,
retention time.Duration,
interval time.Duration,
rg RevGetter,
c Compactable,
) (Compactor, error) {
@ -64,9 +65,9 @@ func New(
}
switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, interval, rg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), interval, rg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}

View File

@ -30,9 +30,10 @@ import (
// Periodic compacts the log by purging revisions older than
// the configured retention time.
type Periodic struct {
lg *zap.Logger
clock clockwork.Clock
period time.Duration
lg *zap.Logger
clock clockwork.Clock
period time.Duration
interval time.Duration
rg RevGetter
c Compactable
@ -48,13 +49,14 @@ type Periodic struct {
// newPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, interval time.Duration, rg RevGetter, c Compactable) *Periodic {
pc := &Periodic{
lg: lg,
clock: clock,
period: h,
rg: rg,
c: c,
lg: lg,
clock: clock,
period: h,
interval: interval,
rg: rg,
c: c,
}
// revs won't be longer than the retentions.
pc.revs = make([]int64, 0, pc.getRetentions())
@ -162,11 +164,15 @@ func (pc *Periodic) Run() {
}()
}
// if static interval is provided, compact every x duration.
// if given compaction period x is <1-hour, compact every x duration.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (pc *Periodic) getCompactInterval() time.Duration {
if pc.interval != 0 {
return pc.interval
}
itv := pc.period
if itv > time.Hour {
itv = time.Hour

View File

@ -30,12 +30,13 @@ import (
func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
@ -82,11 +83,12 @@ func TestPeriodicHourly(t *testing.T) {
func TestPeriodicMinutes(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
@ -129,12 +131,64 @@ func TestPeriodicMinutes(t *testing.T) {
}
}
func TestPeriodicMinutesWithInterval(t *testing.T) {
retentionMinutes := 10
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := 2 * time.Minute
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
// compaction doesn't happen til 10 minutes elapse
for i := 0; i < retentionMinutes; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
for i := 0; i < 10; i++ {
// advance 20 minutes, one revision for each minute
for j := 0; j < 20; j++ {
rg.Wait(1)
fc.Advance(1 * time.Minute)
}
// compact
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// the expected revision is the current revision minus the retention duration
// since we made a revision every minute
expectedRevision := rg.rev - int64(retentionDuration.Minutes())
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
intervalDuration := time.Duration(0)
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
tb.Pause()
@ -177,11 +231,12 @@ func TestPeriodicPause(t *testing.T) {
func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()

View File

@ -33,6 +33,7 @@ type Revision struct {
clock clockwork.Clock
retention int64
interval time.Duration
rg RevGetter
c Compactable
@ -46,11 +47,16 @@ type Revision struct {
// newRevision creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, interval time.Duration, rg RevGetter, c Compactable) *Revision {
// default revision interval to 5 minutes
if interval == 0 {
interval = time.Minute * 5
}
rc := &Revision{
lg: lg,
clock: clock,
retention: retention,
interval: interval,
rg: rg,
c: c,
}
@ -58,8 +64,6 @@ func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevG
return rc
}
const revInterval = 5 * time.Minute
// Run runs revision-based compactor.
func (rc *Revision) Run() {
prev := int64(0)
@ -68,7 +72,7 @@ func (rc *Revision) Run() {
select {
case <-rc.ctx.Done():
return
case <-rc.clock.After(revInterval):
case <-rc.clock.After(rc.interval):
rc.mu.Lock()
p := rc.paused
rc.mu.Unlock()
@ -102,7 +106,7 @@ func (rc *Revision) Run() {
"failed auto revision compaction",
zap.Int64("revision", rev),
zap.Int64("revision-compaction-retention", rc.retention),
zap.Duration("retry-interval", revInterval),
zap.Duration("retry-interval", rc.interval),
zap.Error(err),
)
}

View File

@ -26,22 +26,24 @@ import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
)
const testRevInterval = 5 * time.Minute
func TestRevision(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable)
tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable)
tb.Run()
defer tb.Stop()
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
// nothing happens
rg.SetRev(99) // will be 100
expectedRevision := int64(90)
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
@ -58,7 +60,7 @@ func TestRevision(t *testing.T) {
rg.SetRev(199) // will be 200
expectedRevision = int64(190)
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err = compactable.Wait(1)
if err != nil {
@ -73,15 +75,15 @@ func TestRevisionPause(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable)
tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable)
tb.Run()
tb.Pause()
// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / revInterval)
n := int(time.Hour / testRevInterval)
for i := 0; i < 3*n; i++ {
fc.Advance(revInterval)
fc.Advance(testRevInterval)
}
// tb ends up waiting for the clock
@ -95,7 +97,7 @@ func TestRevisionPause(t *testing.T) {
tb.Resume()
// unblock clock, will kick off a compaction at hour 3:05
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {

View File

@ -384,7 +384,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, cfg.AutoCompactionInterval, srv.kv, srv)
if err != nil {
return nil, err
}