mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #18514 from JalinWang/backport/release-3.5
[3.5] Introduce the CompactionSleepInterval flag
This commit is contained in:
commit
1d4372addc
@ -114,6 +114,7 @@ type ServerConfig struct {
|
||||
AutoCompactionRetention time.Duration
|
||||
AutoCompactionMode string
|
||||
CompactionBatchLimit int
|
||||
CompactionSleepInterval time.Duration
|
||||
QuotaBackendBytes int64
|
||||
MaxTxnOps uint
|
||||
|
||||
|
@ -334,9 +334,11 @@ type Config struct {
|
||||
// Requires experimental-enable-lease-checkpoint to be enabled.
|
||||
// Deprecated in v3.6.
|
||||
// TODO: Delete in v3.7
|
||||
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
|
||||
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
|
||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
||||
// takes more time than this value.
|
||||
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
|
||||
|
@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
|
||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||
|
@ -296,6 +296,7 @@ func newConfig() *config {
|
||||
// TODO: delete in v3.7
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
||||
|
@ -268,6 +268,8 @@ Experimental feature:
|
||||
Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.
|
||||
--experimental-compaction-batch-limit 1000
|
||||
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
||||
--experimental-compaction-sleep-interval '10ms'
|
||||
ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch.
|
||||
--experimental-peer-skip-client-san-verification 'false'
|
||||
Skip verification of SAN field in client certificate for peer connections.
|
||||
--experimental-watch-progress-notify-interval '10m'
|
||||
|
@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||
|
||||
mvccStoreConfig := mvcc.StoreConfig{
|
||||
CompactionBatchLimit: cfg.CompactionBatchLimit,
|
||||
CompactionSleepInterval: cfg.CompactionSleepInterval,
|
||||
}
|
||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
|
||||
|
||||
kvindex := ci.ConsistentIndex()
|
||||
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
||||
|
||||
if beExist {
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
|
@ -50,10 +50,12 @@ const (
|
||||
)
|
||||
|
||||
var restoreChunkKeys = 10000 // non-const for testing
|
||||
var defaultCompactBatchLimit = 1000
|
||||
var defaultCompactionBatchLimit = 1000
|
||||
var defaultCompactionSleepInterval = 10 * time.Millisecond
|
||||
|
||||
type StoreConfig struct {
|
||||
CompactionBatchLimit int
|
||||
CompactionBatchLimit int
|
||||
CompactionSleepInterval time.Duration
|
||||
}
|
||||
|
||||
type store struct {
|
||||
@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
if cfg.CompactionBatchLimit == 0 {
|
||||
cfg.CompactionBatchLimit = defaultCompactBatchLimit
|
||||
cfg.CompactionBatchLimit = defaultCompactionBatchLimit
|
||||
}
|
||||
if cfg.CompactionSleepInterval == 0 {
|
||||
cfg.CompactionSleepInterval = defaultCompactionSleepInterval
|
||||
}
|
||||
s := &store{
|
||||
cfg: cfg,
|
||||
|
@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||
|
||||
batchNum := s.cfg.CompactionBatchLimit
|
||||
batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval)
|
||||
defer batchTicker.Stop()
|
||||
h := newKVHasher(prevCompactRev, compactMainRev, keep)
|
||||
last := make([]byte, 8+1+8)
|
||||
|
||||
for {
|
||||
var rev revision
|
||||
|
||||
@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
||||
h.WriteKeyValue(keys[i], values[i])
|
||||
}
|
||||
|
||||
if len(keys) < s.cfg.CompactionBatchLimit {
|
||||
if len(keys) < batchNum {
|
||||
// gofail: var compactBeforeSetFinishedCompact struct{}
|
||||
rbytes := make([]byte, 8+1+8)
|
||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||
@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
||||
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
case <-batchTicker.C:
|
||||
case <-s.stopc:
|
||||
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
|
||||
}
|
||||
|
@ -910,7 +910,10 @@ func newFakeStore() *store {
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
rangeRespc: make(chan rangeResp, 5)}}
|
||||
s := &store{
|
||||
cfg: StoreConfig{CompactionBatchLimit: 10000},
|
||||
cfg: StoreConfig{
|
||||
CompactionBatchLimit: 10000,
|
||||
CompactionSleepInterval: defaultCompactionSleepInterval,
|
||||
},
|
||||
b: b,
|
||||
le: &lease.FakeLessor{},
|
||||
kvindex: newFakeIndex(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user