Merge 98f118adebc3651fdde0bc342fe4f35240d0b963 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Justin Cichra 2024-09-26 09:29:34 +08:00 committed by GitHub
commit e4ddacf21b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 146 additions and 29 deletions

View File

@ -117,6 +117,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
AutoCompactionInterval time.Duration
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64

View File

@ -272,6 +272,9 @@ type Config struct {
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
AutoCompactionRetention string `json:"auto-compaction-retention"`
// AutoCompactionInterval is the delay between compaction runs.
// If no interval is specified 'periodic' defaults to retention, revision defaults to 5 minutes
AutoCompactionInterval string `json:"auto-compaction-interval"`
// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server
@ -724,6 +727,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")
fs.StringVar(&cfg.AutoCompactionInterval, "auto-compaction-interval", "", "Auto compaction interval for mvcc key value store. Default is based on mode selected.")
// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")

View File

@ -533,6 +533,35 @@ func TestAutoCompactionModeParse(t *testing.T) {
}
}
func TestAutoCompactionIntervalParse(t *testing.T) {
tests := []struct {
interval string
werr bool
wdur time.Duration
}{
{"", false, 0},
{"1", true, 0},
{"1h", false, time.Hour},
{"1s", false, time.Second},
{"a", true, 0},
{"-1", true, 0},
}
hasErr := func(err error) bool {
return err != nil
}
for i, tt := range tests {
dur, err := parseCompactionInterval(tt.interval)
if hasErr(err) != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
if dur != tt.wdur {
t.Errorf("#%d: duration = %s, want %s", i, dur, tt.wdur)
}
}
}
func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
defer func() { getCluster = srv.GetCluster }()

View File

@ -165,6 +165,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return e, err
}
autoCompactionInterval, err := parseCompactionInterval(cfg.AutoCompactionInterval)
if err != nil {
return e, err
}
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := config.ServerConfig{
@ -189,6 +194,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
AutoCompactionInterval: autoCompactionInterval,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
@ -892,6 +898,13 @@ func (e *Etcd) GetLogger() *zap.Logger {
return l
}
func parseCompactionInterval(interval string) (ret time.Duration, err error) {
if interval == "" {
return ret, nil
}
return time.ParseDuration(interval)
}
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
h, err := strconv.Atoi(retention)
if err == nil && h >= 0 {

View File

@ -168,6 +168,8 @@ Clustering:
Auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--auto-compaction-interval ''
Auto compaction interval. Empty means use default based on mode selected.
--v2-deprecation '` + string(cconfig.V2DeprDefault) + `'
Phase of v2store deprecation. Allows to opt-in for higher compatibility mode.
Supported values:

View File

@ -56,6 +56,7 @@ func New(
lg *zap.Logger,
mode string,
retention time.Duration,
interval time.Duration,
rg RevGetter,
c Compactable,
) (Compactor, error) {
@ -64,9 +65,9 @@ func New(
}
switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, interval, rg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), interval, rg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}

View File

@ -30,9 +30,10 @@ import (
// Periodic compacts the log by purging revisions older than
// the configured retention time.
type Periodic struct {
lg *zap.Logger
clock clockwork.Clock
period time.Duration
lg *zap.Logger
clock clockwork.Clock
period time.Duration
interval time.Duration
rg RevGetter
c Compactable
@ -48,13 +49,14 @@ type Periodic struct {
// newPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, interval time.Duration, rg RevGetter, c Compactable) *Periodic {
pc := &Periodic{
lg: lg,
clock: clock,
period: h,
rg: rg,
c: c,
lg: lg,
clock: clock,
period: h,
interval: interval,
rg: rg,
c: c,
}
// revs won't be longer than the retentions.
pc.revs = make([]int64, 0, pc.getRetentions())
@ -162,11 +164,15 @@ func (pc *Periodic) Run() {
}()
}
// if static interval is provided, compact every x duration.
// if given compaction period x is <1-hour, compact every x duration.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (pc *Periodic) getCompactInterval() time.Duration {
if pc.interval != 0 {
return pc.interval
}
itv := pc.period
if itv > time.Hour {
itv = time.Hour

View File

@ -30,12 +30,13 @@ import (
func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
@ -82,11 +83,12 @@ func TestPeriodicHourly(t *testing.T) {
func TestPeriodicMinutes(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
@ -129,12 +131,64 @@ func TestPeriodicMinutes(t *testing.T) {
}
}
func TestPeriodicMinutesWithInterval(t *testing.T) {
retentionMinutes := 10
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := 2 * time.Minute
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()
// compaction doesn't happen til 10 minutes elapse
for i := 0; i < retentionMinutes; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
for i := 0; i < 10; i++ {
// advance 20 minutes, one revision for each minute
for j := 0; j < 20; j++ {
rg.Wait(1)
fc.Advance(1 * time.Minute)
}
// compact
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// the expected revision is the current revision minus the retention duration
// since we made a revision every minute
expectedRevision := rg.rev - int64(retentionDuration.Minutes())
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
intervalDuration := time.Duration(0)
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
tb.Pause()
@ -177,11 +231,12 @@ func TestPeriodicPause(t *testing.T) {
func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)
tb.Run()
defer tb.Stop()

View File

@ -33,6 +33,7 @@ type Revision struct {
clock clockwork.Clock
retention int64
interval time.Duration
rg RevGetter
c Compactable
@ -46,11 +47,16 @@ type Revision struct {
// newRevision creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, interval time.Duration, rg RevGetter, c Compactable) *Revision {
// default revision interval to 5 minutes
if interval == 0 {
interval = time.Minute * 5
}
rc := &Revision{
lg: lg,
clock: clock,
retention: retention,
interval: interval,
rg: rg,
c: c,
}
@ -58,8 +64,6 @@ func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevG
return rc
}
const revInterval = 5 * time.Minute
// Run runs revision-based compactor.
func (rc *Revision) Run() {
prev := int64(0)
@ -68,7 +72,7 @@ func (rc *Revision) Run() {
select {
case <-rc.ctx.Done():
return
case <-rc.clock.After(revInterval):
case <-rc.clock.After(rc.interval):
rc.mu.Lock()
p := rc.paused
rc.mu.Unlock()
@ -102,7 +106,7 @@ func (rc *Revision) Run() {
"failed auto revision compaction",
zap.Int64("revision", rev),
zap.Int64("revision-compaction-retention", rc.retention),
zap.Duration("retry-interval", revInterval),
zap.Duration("retry-interval", rc.interval),
zap.Error(err),
)
}

View File

@ -26,22 +26,24 @@ import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
)
const testRevInterval = 5 * time.Minute
func TestRevision(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable)
tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable)
tb.Run()
defer tb.Stop()
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
// nothing happens
rg.SetRev(99) // will be 100
expectedRevision := int64(90)
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
@ -58,7 +60,7 @@ func TestRevision(t *testing.T) {
rg.SetRev(199) // will be 200
expectedRevision = int64(190)
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err = compactable.Wait(1)
if err != nil {
@ -73,15 +75,15 @@ func TestRevisionPause(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable)
tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable)
tb.Run()
tb.Pause()
// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / revInterval)
n := int(time.Hour / testRevInterval)
for i := 0; i < 3*n; i++ {
fc.Advance(revInterval)
fc.Advance(testRevInterval)
}
// tb ends up waiting for the clock
@ -95,7 +97,7 @@ func TestRevisionPause(t *testing.T) {
tb.Resume()
// unblock clock, will kick off a compaction at hour 3:05
fc.Advance(revInterval)
fc.Advance(testRevInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {

View File

@ -384,7 +384,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, cfg.AutoCompactionInterval, srv.kv, srv)
if err != nil {
return nil, err
}