mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Introduce compaction sleep interval flag
This is a backporting cherry-pick of the following commits: - add flagsline Signed-off-by: Jalin Wang <JalinWang@outlook.com> - etcdserver: rename defaultCompactionSleepInterval var (#18495) etcdserver: rename `minimumBatchInterval` to `defaultCompactionSleepInterval` and `defaultCompactBatchLimit` to `defaultCompactionBatchLimit` Signed-off-by: Jalin Wang <JalinWang@outlook.com> (cherry picked from commit 2c53be7c5d91e9b618342d63d2d06c3b265abee4) - test: add CompactionSleepInterval in FakeStore's config After setting the ComparionSleepInterval, we can use time.Ticker instead of time.After to optimize the scheduleComparison(), otherwise it will fail in the 'TestStoreCompact(t)' test. Signed-off-by: guozhao <guozhao@360.cn> (cherry picked from commit fab8474ef8370a089fe0174e28c197b5b93898df) - add sleep interval (cherry picked from commit 184b0e5d4964f1115590acec50fa5e584a2f7770) Signed-off-by: Jalin Wang <JalinWang@outlook.com>
This commit is contained in:
parent
f739ef40c8
commit
0263597ba8
@ -114,6 +114,7 @@ type ServerConfig struct {
|
|||||||
AutoCompactionRetention time.Duration
|
AutoCompactionRetention time.Duration
|
||||||
AutoCompactionMode string
|
AutoCompactionMode string
|
||||||
CompactionBatchLimit int
|
CompactionBatchLimit int
|
||||||
|
CompactionSleepInterval time.Duration
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
MaxTxnOps uint
|
MaxTxnOps uint
|
||||||
|
|
||||||
|
|||||||
@ -336,6 +336,8 @@ type Config struct {
|
|||||||
// TODO: Delete in v3.7
|
// TODO: Delete in v3.7
|
||||||
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||||
|
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
|
||||||
|
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
|
||||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||||
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
||||||
// takes more time than this value.
|
// takes more time than this value.
|
||||||
|
|||||||
@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
|
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
|
||||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||||
|
|||||||
@ -296,6 +296,7 @@ func newConfig() *config {
|
|||||||
// TODO: delete in v3.7
|
// TODO: delete in v3.7
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
||||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||||
|
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
||||||
|
|||||||
@ -268,6 +268,8 @@ Experimental feature:
|
|||||||
Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.
|
Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.
|
||||||
--experimental-compaction-batch-limit 1000
|
--experimental-compaction-batch-limit 1000
|
||||||
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
||||||
|
--experimental-compaction-sleep-interval '10ms'
|
||||||
|
ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch.
|
||||||
--experimental-peer-skip-client-san-verification 'false'
|
--experimental-peer-skip-client-san-verification 'false'
|
||||||
Skip verification of SAN field in client certificate for peer connections.
|
Skip verification of SAN field in client certificate for peer connections.
|
||||||
--experimental-watch-progress-notify-interval '10m'
|
--experimental-watch-progress-notify-interval '10m'
|
||||||
|
|||||||
@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
|
||||||
|
mvccStoreConfig := mvcc.StoreConfig{
|
||||||
|
CompactionBatchLimit: cfg.CompactionBatchLimit,
|
||||||
|
CompactionSleepInterval: cfg.CompactionSleepInterval,
|
||||||
|
}
|
||||||
|
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
|
||||||
|
|
||||||
kvindex := ci.ConsistentIndex()
|
kvindex := ci.ConsistentIndex()
|
||||||
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
||||||
|
|
||||||
if beExist {
|
if beExist {
|
||||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||||
// etcd from pre-3.0 release.
|
// etcd from pre-3.0 release.
|
||||||
|
|||||||
@ -50,10 +50,12 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var restoreChunkKeys = 10000 // non-const for testing
|
var restoreChunkKeys = 10000 // non-const for testing
|
||||||
var defaultCompactBatchLimit = 1000
|
var defaultCompactionBatchLimit = 1000
|
||||||
|
var defaultCompactionSleepInterval = 10 * time.Millisecond
|
||||||
|
|
||||||
type StoreConfig struct {
|
type StoreConfig struct {
|
||||||
CompactionBatchLimit int
|
CompactionBatchLimit int
|
||||||
|
CompactionSleepInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type store struct {
|
type store struct {
|
||||||
@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
|||||||
lg = zap.NewNop()
|
lg = zap.NewNop()
|
||||||
}
|
}
|
||||||
if cfg.CompactionBatchLimit == 0 {
|
if cfg.CompactionBatchLimit == 0 {
|
||||||
cfg.CompactionBatchLimit = defaultCompactBatchLimit
|
cfg.CompactionBatchLimit = defaultCompactionBatchLimit
|
||||||
|
}
|
||||||
|
if cfg.CompactionSleepInterval == 0 {
|
||||||
|
cfg.CompactionSleepInterval = defaultCompactionSleepInterval
|
||||||
}
|
}
|
||||||
s := &store{
|
s := &store{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
|||||||
@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
|||||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||||
|
|
||||||
batchNum := s.cfg.CompactionBatchLimit
|
batchNum := s.cfg.CompactionBatchLimit
|
||||||
|
batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval)
|
||||||
|
defer batchTicker.Stop()
|
||||||
h := newKVHasher(prevCompactRev, compactMainRev, keep)
|
h := newKVHasher(prevCompactRev, compactMainRev, keep)
|
||||||
last := make([]byte, 8+1+8)
|
last := make([]byte, 8+1+8)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var rev revision
|
var rev revision
|
||||||
|
|
||||||
@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
|||||||
h.WriteKeyValue(keys[i], values[i])
|
h.WriteKeyValue(keys[i], values[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(keys) < s.cfg.CompactionBatchLimit {
|
if len(keys) < batchNum {
|
||||||
// gofail: var compactBeforeSetFinishedCompact struct{}
|
// gofail: var compactBeforeSetFinishedCompact struct{}
|
||||||
rbytes := make([]byte, 8+1+8)
|
rbytes := make([]byte, 8+1+8)
|
||||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||||
@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
|
|||||||
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(10 * time.Millisecond):
|
case <-batchTicker.C:
|
||||||
case <-s.stopc:
|
case <-s.stopc:
|
||||||
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
|
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -910,7 +910,10 @@ func newFakeStore() *store {
|
|||||||
Recorder: &testutil.RecorderBuffered{},
|
Recorder: &testutil.RecorderBuffered{},
|
||||||
rangeRespc: make(chan rangeResp, 5)}}
|
rangeRespc: make(chan rangeResp, 5)}}
|
||||||
s := &store{
|
s := &store{
|
||||||
cfg: StoreConfig{CompactionBatchLimit: 10000},
|
cfg: StoreConfig{
|
||||||
|
CompactionBatchLimit: 10000,
|
||||||
|
CompactionSleepInterval: defaultCompactionSleepInterval,
|
||||||
|
},
|
||||||
b: b,
|
b: b,
|
||||||
le: &lease.FakeLessor{},
|
le: &lease.FakeLessor{},
|
||||||
kvindex: newFakeIndex(),
|
kvindex: newFakeIndex(),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user