mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13018 from ipixiu/feature/bolt-compaction-sleep-interval
add bolt compaction sleep interval
This commit is contained in:
commit
05674c87fb
@ -114,6 +114,7 @@ type ServerConfig struct {
|
||||
AutoCompactionRetention time.Duration
|
||||
AutoCompactionMode string
|
||||
CompactionBatchLimit int
|
||||
CompactionSleepInterval time.Duration
|
||||
QuotaBackendBytes int64
|
||||
MaxTxnOps uint
|
||||
|
||||
|
@ -315,8 +315,10 @@ type Config struct {
|
||||
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
|
||||
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
|
||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
||||
// takes more time than this value.
|
||||
|
@ -217,6 +217,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
|
||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||
|
@ -282,6 +282,7 @@ func newConfig() *config {
|
||||
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
||||
|
@ -608,10 +608,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||
|
||||
mvccStoreConfig := mvcc.StoreConfig{
|
||||
CompactionBatchLimit: cfg.CompactionBatchLimit,
|
||||
CompactionSleepInterval: cfg.CompactionSleepInterval,
|
||||
}
|
||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
|
||||
|
||||
kvindex := ci.ConsistentIndex()
|
||||
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
||||
|
||||
if beExist {
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
|
@ -52,9 +52,11 @@ const (
|
||||
|
||||
var restoreChunkKeys = 10000 // non-const for testing
|
||||
var defaultCompactBatchLimit = 1000
|
||||
var minimumBatchInterval = 10 * time.Millisecond
|
||||
|
||||
type StoreConfig struct {
|
||||
CompactionBatchLimit int
|
||||
CompactionBatchLimit int
|
||||
CompactionSleepInterval time.Duration
|
||||
}
|
||||
|
||||
type store struct {
|
||||
@ -96,6 +98,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
if cfg.CompactionBatchLimit == 0 {
|
||||
cfg.CompactionBatchLimit = defaultCompactBatchLimit
|
||||
}
|
||||
if cfg.CompactionSleepInterval == 0 {
|
||||
cfg.CompactionSleepInterval = minimumBatchInterval
|
||||
}
|
||||
s := &store{
|
||||
cfg: cfg,
|
||||
b: b,
|
||||
|
@ -32,6 +32,9 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||
|
||||
batchNum := s.cfg.CompactionBatchLimit
|
||||
batchInterval := s.cfg.CompactionSleepInterval
|
||||
|
||||
last := make([]byte, 8+1+8)
|
||||
for {
|
||||
var rev revision
|
||||
@ -40,7 +43,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
|
||||
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
if _, ok := keep[rev]; !ok {
|
||||
@ -49,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) < s.cfg.CompactionBatchLimit {
|
||||
if len(keys) < batchNum {
|
||||
rbytes := make([]byte, 8+1+8)
|
||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
|
||||
@ -70,7 +73,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
case <-time.After(batchInterval):
|
||||
case <-s.stopc:
|
||||
return false
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user